This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bca4689a5a4 [fix](cloud) fix unnecessary conflict range in lazy commit
(#60274)
bca4689a5a4 is described below
commit bca4689a5a425b7e0d8e96efa31a2d13a38f0968
Author: walter <[email protected]>
AuthorDate: Thu Jan 29 15:59:02 2026 +0800
[fix](cloud) fix unnecessary conflict range in lazy commit (#60274)
Before:
```
I20260127 07:50:10.853658 2639096 meta_service_util.cpp:78] update table
version txn_id=885400033280 db_id=15246789 table_id=42356789
I20260127
07:50:10.853765 2639096 meta_service_txn.cpp:2381] put_size=160197 del_size=0
num_put_keys=2002 num_del_keys=0 txn_size=12976511 txn_id=885400033280
W20260127 07:50:10.924083 2639096
txn_kv.cpp:782] fdb commit error, c [...]
```
After:
```
I20260127 09:02:46.518415 2656919 meta_service_util.cpp:78] update table
version txn_id=5329084161024 db_id=15246789 table_id=42356789
I20260127 09:02:46.518524 2656919 meta_service_txn.cpp:2402]
put_size=162198 del_size=0 num_put_keys=2002 num_del_keys=0 txn_size=690512
txn_id=5329084161024 the underlying txn size=846643
I20260127 09:02:46.554267 2657045 txn_lazy_committer.cpp:658] lazy task
commit txn_id=5329084161024 retry_times=0
```
---
cloud/src/meta-service/meta_service_txn.cpp | 36 ++++++++-
cloud/src/meta-store/mem_txn_kv.h | 2 +-
cloud/src/meta-store/txn_kv.cpp | 53 ++++++++++++-
cloud/src/meta-store/txn_kv.h | 6 +-
cloud/test/txn_lazy_commit_test.cpp | 117 +++++++++++++++++++++++++---
5 files changed, 191 insertions(+), 23 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index e7f76f2b7ff..8d26652e0a1 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1312,7 +1312,7 @@ std::pair<MetaServiceCode, std::string>
get_tablet_indexes(
}
TxnErrorCode err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
- Transaction::BatchGetOptions(false));
+ Transaction::BatchGetOptions(snapshot));
if (err != TxnErrorCode::TXN_OK) {
auto msg = fmt::format("failed to get tablet table index ids, err={}",
err);
LOG_WARNING(msg);
@@ -1891,6 +1891,9 @@ void MetaServiceImpl::commit_txn_immediately(
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter
<< 1;
+ } else if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+ LOG(WARNING) << "commit txn failed due to txn size too large,
txn_id=" << txn_id
+ << " the underlying txn size=" <<
txn->approximate_bytes(true);
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" <<
err;
@@ -2079,13 +2082,13 @@ void MetaServiceImpl::commit_txn_eventually(
std::ranges::views::transform(
[](const auto& pair) { return pair.second.tablet_id();
}));
if (!is_versioned_read) {
- std::tie(code, msg) =
- get_tablet_indexes(txn.get(), &tablet_ids, instance_id,
acquired_tablet_ids);
+ std::tie(code, msg) = get_tablet_indexes(txn.get(), &tablet_ids,
instance_id,
+ acquired_tablet_ids,
true);
if (code != MetaServiceCode::OK) {
return;
}
} else {
- err = meta_reader.get_tablet_indexes(txn.get(),
acquired_tablet_ids, &tablet_ids);
+ err = meta_reader.get_tablet_indexes(txn.get(),
acquired_tablet_ids, &tablet_ids, true);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet indexes, err={}", err);
@@ -2113,6 +2116,24 @@ void MetaServiceImpl::commit_txn_eventually(
continue;
}
+ stats.get_bytes += txn->get_bytes();
+ stats.put_bytes += txn->put_bytes();
+ stats.del_bytes += txn->delete_bytes();
+ stats.get_counter += txn->num_get_keys();
+ stats.put_counter += txn->num_put_keys();
+ stats.del_counter += txn->num_del_keys();
+
+ // Reset txn to avoid txn is too old to perform reads or be committed
+ txn.reset();
+ err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
CommitTxnLogPB commit_txn_log;
commit_txn_log.set_txn_id(txn_id);
commit_txn_log.set_db_id(db_id);
@@ -2384,6 +2405,9 @@ void MetaServiceImpl::commit_txn_eventually(
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter
<< 1;
+ } else if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+ LOG(WARNING) << "commit txn failed due to txn size too large,
txn_id=" << txn_id
+ << " the underlying txn size=" <<
txn->approximate_bytes(true);
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" <<
err;
@@ -2931,6 +2955,10 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter
<< 1;
+ } else if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+ LOG(WARNING) << "commit txn with sub txn failed due to txn
size too large, txn_id="
+ << txn_id
+ << " the underlying txn size=" <<
txn->approximate_bytes(true);
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id <<
" err=" << err;
diff --git a/cloud/src/meta-store/mem_txn_kv.h
b/cloud/src/meta-store/mem_txn_kv.h
index d7edd1bd5af..356fe3cedf6 100644
--- a/cloud/src/meta-store/mem_txn_kv.h
+++ b/cloud/src/meta-store/mem_txn_kv.h
@@ -245,7 +245,7 @@ public:
const std::vector<std::pair<std::string,
std::string>>& ranges,
const BatchGetOptions& opts = BatchGetOptions())
override;
- size_t approximate_bytes() const override { return approximate_bytes_; }
+ size_t approximate_bytes(bool = false) const override { return
approximate_bytes_; }
size_t num_get_keys() const override { return num_get_keys_; }
diff --git a/cloud/src/meta-store/txn_kv.cpp b/cloud/src/meta-store/txn_kv.cpp
index 6623fedb5dc..d55bcec618c 100644
--- a/cloud/src/meta-store/txn_kv.cpp
+++ b/cloud/src/meta-store/txn_kv.cpp
@@ -533,7 +533,10 @@ TxnErrorCode Transaction::get(std::string_view key,
std::string* val, bool snaps
may_logging_single_version_reading(key);
StopWatch sw;
- approximate_bytes_ += key.size() * 2; // See
fdbclient/ReadYourWrites.actor.cpp for details
+ if (!snapshot) {
+ // See fdbclient/ReadYourWrites.actor.cpp for details
+ approximate_bytes_ += key.size() * 2;
+ }
auto* fut = fdb_transaction_get(txn_, (uint8_t*)key.data(), key.size(),
snapshot);
g_bvar_txn_kv_get_count_normalized << 1;
@@ -577,7 +580,10 @@ TxnErrorCode Transaction::get(std::string_view begin,
std::string_view end,
may_logging_single_version_reading(begin);
StopWatch sw;
- approximate_bytes_ += begin.size() + end.size();
+ if (!opts.snapshot) {
+ // See fdbclient/ReadYourWrites.actor.cpp for details
+ approximate_bytes_ += begin.size() * 2 + end.size() * 2;
+ }
DORIS_CLOUD_DEFER {
g_bvar_txn_kv_range_get << sw.elapsed_us();
};
@@ -881,6 +887,41 @@ TxnErrorCode Transaction::abort() {
return TxnErrorCode::TXN_OK;
}
+size_t Transaction::approximate_bytes(bool fetch_from_underlying_kv) const {
+ if (!fetch_from_underlying_kv) {
+ return approximate_bytes_;
+ }
+
+ auto* fut = fdb_transaction_get_approximate_size(txn_);
+ DORIS_CLOUD_DEFER {
+ fdb_future_destroy(fut);
+ };
+
+ auto code = await_future(fut);
+ if (code != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to await future for
fdb_transaction_get_approximate_size, code="
+ << code;
+ return static_cast<size_t>(-1);
+ }
+
+ auto err = fdb_future_get_error(fut);
+ if (err) {
+ LOG(WARNING) << "failed to get approximate size, code=" << err
+ << " msg=" << fdb_get_error(err);
+ return static_cast<size_t>(-1);
+ }
+
+ int64_t size = 0;
+ err = fdb_future_get_int64(fut, &size);
+ if (err) {
+ LOG(WARNING) << "failed to extract int64 from approximate size future,
code=" << err
+ << " msg=" << fdb_get_error(err);
+ return static_cast<size_t>(-1);
+ }
+
+ return static_cast<size_t>(size);
+}
+
void Transaction::enable_get_versionstamp() {
versionstamp_enabled_ = true;
}
@@ -1143,7 +1184,9 @@ TxnErrorCode
Transaction::batch_get(std::vector<std::optional<std::string>>* res
may_logging_single_version_reading(k);
futures.emplace_back(
fdb_transaction_get(txn_, (uint8_t*)k.data(), k.size(),
opts.snapshot));
- approximate_bytes_ += k.size() * 2;
+ if (!opts.snapshot) {
+ approximate_bytes_ += k.size() * 2;
+ }
}
size_t num_futures = futures.size();
@@ -1221,7 +1264,9 @@ TxnErrorCode Transaction::batch_scan(
snapshot, reverse);
futures.emplace_back(fut);
- approximate_bytes_ += start.size() + end.size();
+ if (!opts.snapshot) {
+ approximate_bytes_ += start.size() + end.size();
+ }
}
size_t num_futures = futures.size();
diff --git a/cloud/src/meta-store/txn_kv.h b/cloud/src/meta-store/txn_kv.h
index ab97a1b4c97..8b9a5e69aa4 100644
--- a/cloud/src/meta-store/txn_kv.h
+++ b/cloud/src/meta-store/txn_kv.h
@@ -373,8 +373,10 @@ public:
/**
* @brief return the approximate bytes consumed by the underlying
transaction buffer.
+ * @param fetch_from_underlying_kv if true, use an heavy operation to get
the size from the underlying
+ * kv store; otherwise, return the tracked size. Default is
false.
**/
- virtual size_t approximate_bytes() const = 0;
+ virtual size_t approximate_bytes(bool fetch_from_underlying_kv = false)
const = 0;
/**
* @brief return the num get keys submitted to this txn.
@@ -821,7 +823,7 @@ public:
const std::vector<std::pair<std::string,
std::string>>& ranges,
const BatchGetOptions& opts = BatchGetOptions())
override;
- size_t approximate_bytes() const override { return approximate_bytes_; }
+ size_t approximate_bytes(bool fetch_from_underlying_kv = false) const
override;
size_t num_get_keys() const override { return num_get_keys_; }
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 1b3125970e3..b316a5e5f45 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -58,8 +58,27 @@ void repair_tablet_index(
bool is_versioned_write);
};
+static std::shared_ptr<TxnKv> txn_kv;
static doris::cloud::RecyclerThreadPoolGroup thread_group;
+static std::shared_ptr<TxnKv> get_fdb_txn_kv() {
+ if (txn_kv) {
+ return txn_kv;
+ }
+
+ int ret = 0;
+ doris::cloud::config::fdb_cluster_file_path = "fdb.cluster";
+ auto fdb_txn_kv = std::dynamic_pointer_cast<doris::cloud::TxnKv>(
+ std::make_shared<doris::cloud::FdbTxnKv>());
+ if (fdb_txn_kv != nullptr) {
+ ret = fdb_txn_kv->init();
+ [&] { ASSERT_EQ(ret, 0); }();
+ }
+ [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }();
+ txn_kv = fdb_txn_kv;
+ return fdb_txn_kv;
+}
+
int main(int argc, char** argv) {
const std::string conf_file = "doris_cloud.conf";
if (!doris::cloud::config::init(conf_file.c_str(), true)) {
@@ -79,6 +98,9 @@ int main(int argc, char** argv) {
}
::testing::InitGoogleTest(&argc, argv);
+ // Initialize FDB
+ get_fdb_txn_kv();
+
auto s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
s3_producer_pool->start();
auto recycle_tablet_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
@@ -238,18 +260,6 @@ static std::shared_ptr<TxnKv> get_mem_txn_kv() {
return txn_kv;
}
-static std::shared_ptr<TxnKv> get_fdb_txn_kv() {
- int ret = 0;
- cloud::config::fdb_cluster_file_path = "fdb.cluster";
- auto fdb_txn_kv =
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
- if (fdb_txn_kv != nullptr) {
- ret = fdb_txn_kv->init();
- [&] { ASSERT_EQ(ret, 0); }();
- }
- [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }();
- return fdb_txn_kv;
-}
-
static void check_tablet_idx_db_id(std::unique_ptr<Transaction>& txn, int64_t
db_id,
int64_t tablet_id) {
std::string mock_instance = "test_instance";
@@ -3336,4 +3346,87 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithAbortAfterCommitTest) {
}
}
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithManyPartitions) {
+ auto txn_kv = get_fdb_txn_kv();
+ int64_t db_id = 15246789;
+ int64_t table_id = 42356789;
+ int64_t index_id = 98765432;
+
+ std::atomic_bool commit_txn_eventually_finish_hit = false;
+
+ auto sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->disable_processing();
+ };
+ sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) {
+ auto [code, msg] = *try_any_cast<std::pair<MetaServiceCode,
std::string>*>(args[0]);
+ ASSERT_EQ(code, MetaServiceCode::OK);
+ commit_txn_eventually_finish_hit = true;
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label_with_many_partitions");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(60000000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res,
+ nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.ShortDebugString();
+ int64_t txn_id = res.txn_id();
+
+ // mock rowset and tablet: 2000 partitions, 64 tablets per partition
+ int64_t tablet_id_base = 1000000;
+ int64_t partition_id_base = 5000000;
+
+ // Change below parameters to large values if you want to do a more stress
test
+ const int num_partitions = 20;
+ const int tablets_per_partition = 2;
+
+ for (int p = 0; p < num_partitions; ++p) {
+ int64_t partition_id = partition_id_base + p;
+ for (int t = 0; t < tablets_per_partition; ++t) {
+ int64_t tablet_id = tablet_id_base + p * tablets_per_partition + t;
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id, index_id,
partition_id);
+ CreateRowsetResponse res;
+ prepare_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ for (size_t i = 0; i < 1000; i++) {
+ // just wait for a while to make sure the commit txn eventually task
is finished
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ if (commit_txn_eventually_finish_hit.load()) {
+ break;
+ }
+ }
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]