This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 734f09060a0 branch-3.0: [fix](cloud) commit txn with sub txns should
consider lazy txn commiter (#54653) (#55351)
734f09060a0 is described below
commit 734f09060a024fddfb80c51155271bfef3582f85
Author: meiyi <[email protected]>
AuthorDate: Thu Aug 28 17:47:38 2025 +0800
branch-3.0: [fix](cloud) commit txn with sub txns should consider lazy txn
commiter (#54653) (#55351)
pick https://github.com/apache/doris/pull/54653
---
cloud/src/meta-service/meta_service_txn.cpp | 881 ++++++++++++++------------
cloud/src/meta-service/txn_lazy_committer.cpp | 1 +
cloud/test/txn_lazy_commit_test.cpp | 279 ++++++++
3 files changed, 745 insertions(+), 416 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 2f3f0d15545..a800b41f83e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -2005,11 +2005,14 @@ void commit_txn_eventually(
* t2: t2_p3(4), t2_p4(4)
*/
void commit_txn_with_sub_txn(const CommitTxnRequest* request,
CommitTxnResponse* response,
- std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode&
code,
- std::string& msg, const std::string& instance_id,
KVStats& stats) {
+ std::shared_ptr<TxnKv>& txn_kv,
+ std::shared_ptr<TxnLazyCommitter>&
txn_lazy_committer,
+ MetaServiceCode& code, std::string& msg,
+ const std::string& instance_id, KVStats& stats) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
auto sub_txn_infos = request->sub_txn_infos();
+
// Create a readonly txn for scan tmp rowset
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
@@ -2020,15 +2023,6 @@ void commit_txn_with_sub_txn(const CommitTxnRequest*
request, CommitTxnResponse*
LOG(WARNING) << msg;
return;
}
- DORIS_CLOUD_DEFER {
- if (txn == nullptr) return;
- stats.get_bytes += txn->get_bytes();
- stats.put_bytes += txn->put_bytes();
- stats.del_bytes += txn->delete_bytes();
- stats.get_counter += txn->num_get_keys();
- stats.put_counter += txn->num_put_keys();
- stats.del_counter += txn->num_del_keys();
- };
// Get db id with txn id
std::string index_val;
@@ -2122,468 +2116,522 @@ void commit_txn_with_sub_txn(const CommitTxnRequest*
request, CommitTxnResponse*
}
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
- // Create a read/write txn for guarantee consistency
txn.reset();
- err = txn_kv->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::CREATE>(err);
- ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
-
- // Get txn info with db_id and txn_id
- std::string info_val; // Will be reused when saving updated txn
- const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
- err = txn->get(info_key, &info_val);
- if (err != TxnErrorCode::TXN_OK) {
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" <<
txn_id << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
-
- TxnInfoPB txn_info;
- if (!txn_info.ParseFromString(info_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" <<
txn_id;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
- // TODO: do more check like txn state
- DCHECK(txn_info.txn_id() == txn_id);
- if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
- code = MetaServiceCode::TXN_ALREADY_ABORTED;
- ss << "transaction is already aborted: db_id=" << db_id << " txn_id="
<< txn_id;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
-
- if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
- code = MetaServiceCode::OK;
- ss << "transaction is already visible: db_id=" << db_id << " txn_id="
<< txn_id;
- msg = ss.str();
- LOG(INFO) << msg;
- response->mutable_txn_info()->CopyFrom(txn_info);
- return;
- }
-
- LOG(INFO) << "txn_id=" << txn_id << " txn_info=" <<
txn_info.ShortDebugString();
+ do {
+ TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn:begin", &txn_id);
+ // Create a read/write txn for guarantee consistency
+ err = txn_kv->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ DORIS_CLOUD_DEFER {
+ if (txn == nullptr) return;
+ stats.get_bytes += txn->get_bytes();
+ stats.put_bytes += txn->put_bytes();
+ stats.del_bytes += txn->delete_bytes();
+ stats.get_counter += txn->num_get_keys();
+ stats.put_counter += txn->num_put_keys();
+ stats.del_counter += txn->num_del_keys();
+ };
- // Prepare rowset meta and new_versions
- // Read tablet indexes in batch.
- std::map<int64_t, int64_t> tablet_id_to_idx;
- std::vector<std::string> tablet_idx_keys;
- std::vector<int64_t> partition_ids;
- auto idx = 0;
- for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
- for (auto& [_, i] : tmp_rowsets_meta) {
- auto tablet_id = i.tablet_id();
- if (tablet_id_to_idx.count(tablet_id) == 0) {
- tablet_id_to_idx.emplace(tablet_id, idx);
- tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id,
i.tablet_id()}));
- partition_ids.push_back(i.partition_id());
- idx++;
+ // Get txn info with db_id and txn_id
+ std::string info_val; // Will be reused when saving updated txn
+ const std::string info_key = txn_info_key({instance_id, db_id,
txn_id});
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ ss << "transaction [" << txn_id << "] not found, db_id=" <<
db_id;
+ } else {
+ ss << "failed to get txn_info, db_id=" << db_id << " txn_id="
<< txn_id
+ << " err=" << err;
}
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
}
- }
- std::vector<std::optional<std::string>> tablet_idx_values;
- err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
Transaction::BatchGetOptions(false));
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get tablet table index ids, err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg << " txn_id=" << txn_id;
- return;
- }
- // tablet_id -> {table/index/partition}_id
- std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
- // table_id -> tablets_ids
- std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
- for (auto [tablet_id, i] : tablet_id_to_idx) {
- if (!tablet_idx_values[i].has_value()) [[unlikely]] {
- // The value must existed
- code = MetaServiceCode::KV_TXN_GET_ERR;
- ss << "failed to get tablet table index ids, err=not found"
- << " tablet_id=" << tablet_id << " key=" <<
hex(tablet_idx_keys[i]);
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" <<
txn_id;
msg = ss.str();
- LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+ LOG(WARNING) << msg;
return;
}
- if
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value()))
[[unlikely]] {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "malformed tablet index value tablet_id=" << tablet_id << "
txn_id=" << txn_id;
+
+ // TODO: do more check like txn state
+ DCHECK(txn_info.txn_id() == txn_id);
+ if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+ code = MetaServiceCode::TXN_ALREADY_ABORTED;
+ ss << "transaction is already aborted: db_id=" << db_id << "
txn_id=" << txn_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
-
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
- VLOG_DEBUG << "tablet_id:" << tablet_id
- << " value:" << tablet_ids[tablet_id].ShortDebugString();
- }
- tablet_idx_keys.clear();
- tablet_idx_values.clear();
+ if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+ code = MetaServiceCode::OK;
+ ss << "transaction is already visible: db_id=" << db_id << "
txn_id=" << txn_id;
+ msg = ss.str();
+ LOG(INFO) << msg;
+ response->mutable_txn_info()->CopyFrom(txn_info);
+ return;
+ }
- // {table/partition} -> version
- std::unordered_map<std::string, uint64_t> new_versions;
- std::vector<std::string> version_keys;
- for (auto& [tablet_id, i] : tablet_id_to_idx) {
- int64_t table_id = tablet_ids[tablet_id].table_id();
- int64_t partition_id = partition_ids[i];
- std::string ver_key = partition_version_key({instance_id, db_id,
table_id, partition_id});
- if (new_versions.count(ver_key) == 0) {
- new_versions.insert({ver_key, 0});
- LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) <<
" txn_id=" << txn_id
- << ", db_id=" << db_id << ", table_id=" << table_id
- << ", partition_id=" << partition_id;
- version_keys.push_back(std::move(ver_key));
- }
- }
- std::vector<std::optional<std::string>> version_values;
- err = txn->batch_get(&version_values, version_keys);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get partition versions, err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg << " txn_id=" << txn_id;
- return;
- }
- size_t total_versions = version_keys.size();
- for (size_t i = 0; i < total_versions; i++) {
- int64_t version;
- if (version_values[i].has_value()) {
- VersionPB version_pb;
- if (!version_pb.ParseFromString(version_values[i].value())) {
+ LOG(INFO) << "txn_id=" << txn_id << " txn_info=" <<
txn_info.ShortDebugString();
+
+ // Prepare rowset meta and new_versions
+ // Read tablet indexes in batch.
+ std::map<int64_t, int64_t> tablet_id_to_idx;
+ std::vector<std::string> tablet_idx_keys;
+ std::vector<int64_t> partition_ids;
+ auto idx = 0;
+ for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+ for (auto& [_, i] : tmp_rowsets_meta) {
+ auto tablet_id = i.tablet_id();
+ if (tablet_id_to_idx.count(tablet_id) == 0) {
+ tablet_id_to_idx.emplace(tablet_id, idx);
+
tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()}));
+ partition_ids.push_back(i.partition_id());
+ idx++;
+ }
+ }
+ }
+ std::vector<std::optional<std::string>> tablet_idx_values;
+ err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
+ Transaction::BatchGetOptions(false));
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get tablet table index ids, err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg << " txn_id=" << txn_id;
+ return;
+ }
+
+ // tablet_id -> {table/index/partition}_id
+ std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+ // table_id -> tablets_ids
+ std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+ for (auto [tablet_id, i] : tablet_id_to_idx) {
+ if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+ // The value must existed
+ code = MetaServiceCode::KV_TXN_GET_ERR;
+ ss << "failed to get tablet table index ids, err=not found"
+ << " tablet_id=" << tablet_id << " key=" <<
hex(tablet_idx_keys[i]);
+ msg = ss.str();
+ LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+ return;
+ }
+ if
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value()))
[[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse version pb txn_id=" << txn_id
- << " key=" << hex(version_keys[i]);
+ ss << "malformed tablet index value tablet_id=" << tablet_id
+ << " txn_id=" << txn_id;
msg = ss.str();
+ LOG(WARNING) << msg;
return;
}
- version = version_pb.version();
- } else {
- version = 1;
+
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+ VLOG_DEBUG << "tablet_id:" << tablet_id
+ << " value:" <<
tablet_ids[tablet_id].ShortDebugString();
}
- new_versions[version_keys[i]] = version;
- LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i])
- << " version:" << version << " txn_id=" << txn_id;
- }
- version_keys.clear();
- version_values.clear();
- std::vector<std::pair<std::string, std::string>> rowsets;
- std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id ->
stats
- for (const auto& sub_txn_info : sub_txn_infos) {
- auto sub_txn_id = sub_txn_info.sub_txn_id();
- auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
- std::unordered_map<int64_t, int64_t> partition_id_to_version;
- for (auto& [_, i] : tmp_rowsets_meta) {
- int64_t tablet_id = i.tablet_id();
+ tablet_idx_keys.clear();
+ tablet_idx_values.clear();
+
+ // {table/partition} -> version
+ std::unordered_map<std::string, uint64_t> new_versions;
+ std::vector<std::string> version_keys;
+ for (auto& [tablet_id, i] : tablet_id_to_idx) {
int64_t table_id = tablet_ids[tablet_id].table_id();
- int64_t partition_id = i.partition_id();
+ int64_t partition_id = partition_ids[i];
std::string ver_key =
partition_version_key({instance_id, db_id, table_id,
partition_id});
- if (new_versions.count(ver_key) == 0) [[unlikely]] {
- // it is impossible.
- code = MetaServiceCode::UNDEFINED_ERR;
- ss << "failed to get partition version key, the target version
not exists in "
- "new_versions."
- << " txn_id=" << txn_id << ", db_id=" << db_id << ",
table_id=" << table_id
- << ", partition_id=" << partition_id;
- msg = ss.str();
- LOG(ERROR) << msg;
- return;
+ if (new_versions.count(ver_key) == 0) {
+ new_versions.insert({ver_key, 0});
+ LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key)
+ << " txn_id=" << txn_id << ", db_id=" << db_id
+ << ", table_id=" << table_id << ", partition_id=" <<
partition_id;
+ version_keys.push_back(std::move(ver_key));
}
-
- // Update rowset version
- int64_t new_version = new_versions[ver_key];
- if (partition_id_to_version.count(partition_id) == 0) {
- new_versions[ver_key] = new_version + 1;
- new_version = new_versions[ver_key];
- partition_id_to_version[partition_id] = new_version;
+ }
+ std::vector<std::optional<std::string>> version_values;
+ err = txn->batch_get(&version_values, version_keys);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get partition versions, err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg << " txn_id=" << txn_id;
+ return;
+ }
+ size_t total_versions = version_keys.size();
+ int64_t last_pending_txn_id = 0;
+ for (size_t i = 0; i < total_versions; i++) {
+ int64_t version;
+ if (version_values[i].has_value()) {
+ VersionPB version_pb;
+ if (!version_pb.ParseFromString(version_values[i].value())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse version pb txn_id=" << txn_id
+ << " key=" << hex(version_keys[i]);
+ msg = ss.str();
+ return;
+ }
+ if (version_pb.pending_txn_ids_size() > 0) {
+ DCHECK(version_pb.pending_txn_ids_size() == 1);
+ last_pending_txn_id = version_pb.pending_txn_ids(0);
+ DCHECK(last_pending_txn_id > 0);
+ break;
+ }
+ version = version_pb.version();
+ } else {
+ version = 1;
}
- i.set_start_version(new_version);
- i.set_end_version(new_version);
- LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
- << ", sub_txn_id=" << sub_txn_id << ", table_id=" <<
table_id
- << ", partition_id=" << partition_id << ", tablet_id="
<< tablet_id
- << ", new_version=" << new_version;
+ new_versions[version_keys[i]] = version;
+ last_pending_txn_id = 0;
+ LOG(INFO) << "xxx get partition_version_key=" <<
hex(version_keys[i])
+ << " version:" << version << " txn_id=" << txn_id;
+ }
+ version_keys.clear();
+ version_values.clear();
- std::string key = meta_rowset_key({instance_id, tablet_id,
i.end_version()});
- std::string val;
- if (!i.SerializeToString(&val)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
- msg = ss.str();
+ if (last_pending_txn_id > 0) {
+ stats.get_bytes += txn->get_bytes();
+ stats.get_counter += txn->num_get_keys();
+ txn.reset();
+
TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::advance_last_pending_txn_id",
+ &last_pending_txn_id);
+ std::shared_ptr<TxnLazyCommitTask> task =
+ txn_lazy_committer->submit(instance_id,
last_pending_txn_id);
+
+ std::tie(code, msg) = task->wait();
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "advance_last_txn failed last_txn=" <<
last_pending_txn_id
+ << " code=" << code << " msg=" << msg;
return;
}
- rowsets.emplace_back(std::move(key), std::move(val));
+ last_pending_txn_id = 0;
+ continue;
+ }
- // Accumulate affected rows
- auto& stats = tablet_stats[tablet_id];
- stats.data_size += i.total_disk_size();
- stats.num_rows += i.num_rows();
- ++stats.num_rowsets;
- stats.num_segs += i.num_segments();
- stats.index_size += i.index_disk_size();
- stats.segment_size += i.data_disk_size();
- } // for tmp_rowsets_meta
- }
+ std::vector<std::pair<std::string, std::string>> rowsets;
+ std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id ->
stats
+ for (const auto& sub_txn_info : sub_txn_infos) {
+ auto sub_txn_id = sub_txn_info.sub_txn_id();
+ auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
+ std::unordered_map<int64_t, int64_t> partition_id_to_version;
+ for (auto& [_, i] : tmp_rowsets_meta) {
+ int64_t tablet_id = i.tablet_id();
+ int64_t table_id = tablet_ids[tablet_id].table_id();
+ int64_t partition_id = i.partition_id();
+ std::string ver_key =
+ partition_version_key({instance_id, db_id, table_id,
partition_id});
+ if (new_versions.count(ver_key) == 0) [[unlikely]] {
+ // it is impossible.
+ code = MetaServiceCode::UNDEFINED_ERR;
+ ss << "failed to get partition version key, the target
version not exists in "
+ "new_versions."
+ << " txn_id=" << txn_id << ", db_id=" << db_id << ",
table_id=" << table_id
+ << ", partition_id=" << partition_id;
+ msg = ss.str();
+ LOG(ERROR) << msg;
+ return;
+ }
- // Save rowset meta
- for (auto& i : rowsets) {
- size_t rowset_size = i.first.size() + i.second.size();
- txn->put(i.first, i.second);
- LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" <<
txn_id
- << " rowset_size=" << rowset_size;
- }
+ // Update rowset version
+ int64_t new_version = new_versions[ver_key];
+ if (partition_id_to_version.count(partition_id) == 0) {
+ new_versions[ver_key] = new_version + 1;
+ new_version = new_versions[ver_key];
+ partition_id_to_version[partition_id] = new_version;
+ }
+ i.set_start_version(new_version);
+ i.set_end_version(new_version);
+ LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
+ << ", sub_txn_id=" << sub_txn_id << ", table_id=" <<
table_id
+ << ", partition_id=" << partition_id << ",
tablet_id=" << tablet_id
+ << ", new_version=" << new_version;
+
+ std::string key = meta_rowset_key({instance_id, tablet_id,
i.end_version()});
+ std::string val;
+ if (!i.SerializeToString(&val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
+ msg = ss.str();
+ return;
+ }
+ rowsets.emplace_back(std::move(key), std::move(val));
- // Save versions
- for (auto& i : new_versions) {
- std::string ver_val;
- VersionPB version_pb;
- version_pb.set_version(i.second);
- if (!version_pb.SerializeToString(&ver_val)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize version_pb when saving, txn_id=" <<
txn_id;
- msg = ss.str();
- return;
+ // Accumulate affected rows
+ auto& stats = tablet_stats[tablet_id];
+ stats.data_size += i.total_disk_size();
+ stats.num_rows += i.num_rows();
+ ++stats.num_rowsets;
+ stats.num_segs += i.num_segments();
+ stats.index_size += i.index_disk_size();
+ stats.segment_size += i.data_disk_size();
+ } // for tmp_rowsets_meta
}
- txn->put(i.first, ver_val);
- LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << "
version:" << i.second
- << " txn_id=" << txn_id;
-
- std::string_view ver_key = i.first;
- ver_key.remove_prefix(1); // Remove key space
- // PartitionVersionKeyInfo {instance_id, db_id, table_id,
partition_id}
- std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
- int ret = decode_key(&ver_key, &out);
- if (ret != 0) [[unlikely]] {
- // decode version key error means this is something wrong,
- // we can not continue this txn
- LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" <<
hex(ver_key);
- code = MetaServiceCode::UNDEFINED_ERR;
- msg = "decode version key error";
- return;
+ // Save rowset meta
+ for (auto& i : rowsets) {
+ size_t rowset_size = i.first.size() + i.second.size();
+ txn->put(i.first, i.second);
+ LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id="
<< txn_id
+ << " rowset_size=" << rowset_size;
}
- int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
- int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
- VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
- << " partition_id=" << partition_id << " version=" <<
i.second;
+ // Save versions
+ for (auto& i : new_versions) {
+ std::string ver_val;
+ VersionPB version_pb;
+ version_pb.set_version(i.second);
+ if (!version_pb.SerializeToString(&ver_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize version_pb when saving, txn_id=" <<
txn_id;
+ msg = ss.str();
+ return;
+ }
- response->add_table_ids(table_id);
- response->add_partition_ids(partition_id);
- response->add_versions(i.second);
- }
+ txn->put(i.first, ver_val);
+ LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << "
version:" << i.second
+ << " txn_id=" << txn_id;
- // Save table versions
- for (auto& i : table_id_tablet_ids) {
- std::string ver_key = table_version_key({instance_id, db_id, i.first});
- txn->atomic_add(ver_key, 1);
- LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << "
txn_id=" << txn_id;
- }
+ std::string_view ver_key = i.first;
+ ver_key.remove_prefix(1); // Remove key space
+ // PartitionVersionKeyInfo {instance_id, db_id, table_id,
partition_id}
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>> out;
+ int ret = decode_key(&ver_key, &out);
+ if (ret != 0) [[unlikely]] {
+ // decode version key error means this is something wrong,
+ // we can not continue this txn
+ LOG(WARNING) << "failed to decode key, ret=" << ret << " key="
<< hex(ver_key);
+ code = MetaServiceCode::UNDEFINED_ERR;
+ msg = "decode version key error";
+ return;
+ }
- LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
+ int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
+ int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
+ VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
+ << " partition_id=" << partition_id << " version=" <<
i.second;
- // Update txn_info
- txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+ response->add_table_ids(table_id);
+ response->add_partition_ids(partition_id);
+ response->add_versions(i.second);
+ }
- auto now_time = system_clock::now();
- uint64_t commit_time =
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
- if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
- code = MetaServiceCode::UNDEFINED_ERR;
- msg = fmt::format("txn is expired, not allow to commit txn_id={}",
txn_id);
- LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
- << " timeout_ms=" << txn_info.timeout_ms() << "
commit_time=" << commit_time;
- return;
- }
- txn_info.set_commit_time(commit_time);
- txn_info.set_finish_time(commit_time);
- if (request->has_commit_attachment()) {
-
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
- }
- LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
- info_val.clear();
- if (!txn_info.SerializeToString(&info_val)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
- txn->put(info_key, info_val);
- LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
+ // Save table versions
+ for (auto& i : table_id_tablet_ids) {
+ std::string ver_key = table_version_key({instance_id, db_id,
i.first});
+ txn->atomic_add(ver_key, 1);
+ LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key)
+ << " txn_id=" << txn_id;
+ }
- // Update stats of affected tablet
- std::deque<std::string> kv_pool;
- std::function<void(const StatsTabletKeyInfo&, const TabletStats&)>
update_tablet_stats;
- if (config::split_tablet_stats) {
- update_tablet_stats = [&](const StatsTabletKeyInfo& info, const
TabletStats& stats) {
- if (stats.num_segs > 0) {
- auto& data_size_key = kv_pool.emplace_back();
- stats_tablet_data_size_key(info, &data_size_key);
- txn->atomic_add(data_size_key, stats.data_size);
- auto& num_rows_key = kv_pool.emplace_back();
- stats_tablet_num_rows_key(info, &num_rows_key);
- txn->atomic_add(num_rows_key, stats.num_rows);
- auto& num_segs_key = kv_pool.emplace_back();
- stats_tablet_num_segs_key(info, &num_segs_key);
- txn->atomic_add(num_segs_key, stats.num_segs);
- auto& index_size_key = kv_pool.emplace_back();
- stats_tablet_index_size_key(info, &index_size_key);
- txn->atomic_add(index_size_key, stats.index_size);
- auto& segment_size_key = kv_pool.emplace_back();
- stats_tablet_segment_size_key(info, &segment_size_key);
- txn->atomic_add(segment_size_key, stats.segment_size);
- }
- auto& num_rowsets_key = kv_pool.emplace_back();
- stats_tablet_num_rowsets_key(info, &num_rowsets_key);
- txn->atomic_add(num_rowsets_key, stats.num_rowsets);
- };
- } else {
- update_tablet_stats = [&](const StatsTabletKeyInfo& info, const
TabletStats& stats) {
- auto& key = kv_pool.emplace_back();
- stats_tablet_key(info, &key);
- auto& val = kv_pool.emplace_back();
- TxnErrorCode err = txn->get(key, &val);
- if (err != TxnErrorCode::TXN_OK) {
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- msg = fmt::format("failed to get tablet stats, err={}
tablet_id={}", err,
- std::get<4>(info));
- return;
- }
- TabletStatsPB stats_pb;
- if (!stats_pb.ParseFromString(val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = fmt::format("malformed tablet stats value, key={}",
hex(key));
- return;
+ LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
+
+ // Update txn_info
+ txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+
+ auto now_time = system_clock::now();
+ uint64_t commit_time =
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
+ if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
+ code = MetaServiceCode::UNDEFINED_ERR;
+ msg = fmt::format("txn is expired, not allow to commit txn_id={}",
txn_id);
+ LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
+ << " timeout_ms=" << txn_info.timeout_ms() << "
commit_time=" << commit_time;
+ return;
+ }
+ txn_info.set_commit_time(commit_time);
+ txn_info.set_finish_time(commit_time);
+ if (request->has_commit_attachment()) {
+
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
+ }
+ LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
+ info_val.clear();
+ if (!txn_info.SerializeToString(&info_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize txn_info when saving, txn_id=" <<
txn_id;
+ msg = ss.str();
+ return;
+ }
+ txn->put(info_key, info_val);
+ LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" <<
txn_id;
+
+ // Update stats of affected tablet
+ std::deque<std::string> kv_pool;
+ std::function<void(const StatsTabletKeyInfo&, const TabletStats&)>
update_tablet_stats;
+ if (config::split_tablet_stats) {
+ update_tablet_stats = [&](const StatsTabletKeyInfo& info, const
TabletStats& stats) {
+ if (stats.num_segs > 0) {
+ auto& data_size_key = kv_pool.emplace_back();
+ stats_tablet_data_size_key(info, &data_size_key);
+ txn->atomic_add(data_size_key, stats.data_size);
+ auto& num_rows_key = kv_pool.emplace_back();
+ stats_tablet_num_rows_key(info, &num_rows_key);
+ txn->atomic_add(num_rows_key, stats.num_rows);
+ auto& num_segs_key = kv_pool.emplace_back();
+ stats_tablet_num_segs_key(info, &num_segs_key);
+ txn->atomic_add(num_segs_key, stats.num_segs);
+ auto& index_size_key = kv_pool.emplace_back();
+ stats_tablet_index_size_key(info, &index_size_key);
+ txn->atomic_add(index_size_key, stats.index_size);
+ auto& segment_size_key = kv_pool.emplace_back();
+ stats_tablet_segment_size_key(info, &segment_size_key);
+ txn->atomic_add(segment_size_key, stats.segment_size);
+ }
+ auto& num_rowsets_key = kv_pool.emplace_back();
+ stats_tablet_num_rowsets_key(info, &num_rowsets_key);
+ txn->atomic_add(num_rowsets_key, stats.num_rowsets);
+ };
+ } else {
+ update_tablet_stats = [&](const StatsTabletKeyInfo& info, const
TabletStats& stats) {
+ auto& key = kv_pool.emplace_back();
+ stats_tablet_key(info, &key);
+ auto& val = kv_pool.emplace_back();
+ TxnErrorCode err = txn->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TABLET_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet stats, err={}
tablet_id={}", err,
+ std::get<4>(info));
+ return;
+ }
+ TabletStatsPB stats_pb;
+ if (!stats_pb.ParseFromString(val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("malformed tablet stats value, key={}",
hex(key));
+ return;
+ }
+ stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
+ stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
+ stats_pb.set_num_rowsets(stats_pb.num_rowsets() +
stats.num_rowsets);
+ stats_pb.set_num_segments(stats_pb.num_segments() +
stats.num_segs);
+ stats_pb.set_index_size(stats_pb.index_size() +
stats.index_size);
+ stats_pb.set_segment_size(stats_pb.segment_size() +
stats.segment_size);
+ stats_pb.SerializeToString(&val);
+ txn->put(key, val);
+ LOG(INFO) << "put stats_tablet_key, key=" << hex(key);
+ };
+ }
+ for (auto& [tablet_id, stats] : tablet_stats) {
+ DCHECK(tablet_ids.count(tablet_id));
+ auto& tablet_idx = tablet_ids[tablet_id];
+ StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(), tablet_id};
+ update_tablet_stats(info, stats);
+ if (code != MetaServiceCode::OK) return;
+ }
+ // Remove tmp rowset meta
+ for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+ for (auto& [k, _] : tmp_rowsets_meta) {
+ txn->remove(k);
+ LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << "
txn_id=" << txn_id;
}
- stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
- stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
- stats_pb.set_num_rowsets(stats_pb.num_rowsets() +
stats.num_rowsets);
- stats_pb.set_num_segments(stats_pb.num_segments() +
stats.num_segs);
- stats_pb.set_index_size(stats_pb.index_size() + stats.index_size);
- stats_pb.set_segment_size(stats_pb.segment_size() +
stats.segment_size);
- stats_pb.SerializeToString(&val);
- txn->put(key, val);
- LOG(INFO) << "put stats_tablet_key, key=" << hex(key);
- };
- }
- for (auto& [tablet_id, stats] : tablet_stats) {
- DCHECK(tablet_ids.count(tablet_id));
- auto& tablet_idx = tablet_ids[tablet_id];
- StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
- tablet_idx.partition_id(), tablet_id};
- update_tablet_stats(info, stats);
- if (code != MetaServiceCode::OK) return;
- }
- // Remove tmp rowset meta
- for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
- for (auto& [k, _] : tmp_rowsets_meta) {
- txn->remove(k);
- LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id="
<< txn_id;
}
- }
- const std::string running_key = txn_running_key({instance_id, db_id,
txn_id});
- LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id="
<< txn_id;
- txn->remove(running_key);
+ const std::string running_key = txn_running_key({instance_id, db_id,
txn_id});
+ LOG(INFO) << "xxx remove running_key=" << hex(running_key) << "
txn_id=" << txn_id;
+ txn->remove(running_key);
- std::string recycle_val;
- std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
- RecycleTxnPB recycle_pb;
- recycle_pb.set_creation_time(commit_time);
- recycle_pb.set_label(txn_info.label());
+ std::string recycle_val;
+ std::string recycle_key = recycle_txn_key({instance_id, db_id,
txn_id});
+ RecycleTxnPB recycle_pb;
+ recycle_pb.set_creation_time(commit_time);
+ recycle_pb.set_label(txn_info.label());
- if (!recycle_pb.SerializeToString(&recycle_val)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
- txn->put(recycle_key, recycle_val);
- LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << hex(recycle_key)
- << " txn_id=" << txn_id;
+ if (!recycle_pb.SerializeToString(&recycle_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
+ msg = ss.str();
+ return;
+ }
+ txn->put(recycle_key, recycle_val);
+ LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" <<
hex(recycle_key)
+ << " txn_id=" << txn_id;
- LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" <<
txn->delete_bytes()
- << " num_put_keys=" << txn->num_put_keys() << " num_del_keys="
<< txn->num_del_keys()
- << " txn_size=" << txn->approximate_bytes() << " txn_id=" <<
txn_id;
+ LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
+ << " del_size=" << txn->delete_bytes() << " num_put_keys="
<< txn->num_put_keys()
+ << " num_del_keys=" << txn->num_del_keys()
+ << " txn_size=" << txn->approximate_bytes() << " txn_id=" <<
txn_id;
- // Finally we are done...
- err = txn->commit();
- if (err != TxnErrorCode::TXN_OK) {
- if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE || err ==
TxnErrorCode::TXN_BYTES_TOO_LARGE) {
- size_t max_size = 0, max_num_segments = 0,
- min_num_segments = std::numeric_limits<size_t>::max(),
avg_num_segments = 0;
- std::pair<std::string, RowsetMetaCloudPB>* max_rowset_meta =
nullptr;
- for (auto& sub_txn : sub_txn_infos) {
- auto it =
sub_txn_to_tmp_rowsets_meta.find(sub_txn.sub_txn_id());
- if (it == sub_txn_to_tmp_rowsets_meta.end()) {
- continue;
- }
- for (auto& rowset_meta : it->second) {
- if (rowset_meta.second.ByteSizeLong() > max_size) {
- max_size = rowset_meta.second.ByteSizeLong();
- max_rowset_meta = &rowset_meta;
+
TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_with_sub_txn::before_commit",
&err, &code);
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ if (err == TxnErrorCode::TXN_VALUE_TOO_LARGE ||
+ err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+ size_t max_size = 0, max_num_segments = 0,
+ min_num_segments = std::numeric_limits<size_t>::max(),
avg_num_segments = 0;
+ std::pair<std::string, RowsetMetaCloudPB>* max_rowset_meta =
nullptr;
+ for (auto& sub_txn : sub_txn_infos) {
+ auto it =
sub_txn_to_tmp_rowsets_meta.find(sub_txn.sub_txn_id());
+ if (it == sub_txn_to_tmp_rowsets_meta.end()) {
+ continue;
}
- if (rowset_meta.second.num_segments() > max_num_segments) {
- max_num_segments = rowset_meta.second.num_segments();
+ for (auto& rowset_meta : it->second) {
+ if (rowset_meta.second.ByteSizeLong() > max_size) {
+ max_size = rowset_meta.second.ByteSizeLong();
+ max_rowset_meta = &rowset_meta;
+ }
+ if (rowset_meta.second.num_segments() >
max_num_segments) {
+ max_num_segments =
rowset_meta.second.num_segments();
+ }
+ if (rowset_meta.second.num_segments() <
min_num_segments) {
+ min_num_segments =
rowset_meta.second.num_segments();
+ }
+ avg_num_segments += rowset_meta.second.num_segments();
}
- if (rowset_meta.second.num_segments() < min_num_segments) {
- min_num_segments = rowset_meta.second.num_segments();
+ if (!it->second.empty()) {
+ avg_num_segments /= it->second.size();
}
- avg_num_segments += rowset_meta.second.num_segments();
}
- if (!it->second.empty()) {
- avg_num_segments /= it->second.size();
+ if (max_rowset_meta) {
+ LOG(WARNING) << "failed to commit kv txn with sub txn"
+ << ", err=" << err << ", txn_id=" << txn_id
+ << ", total_rowsets=" << rowsets.size()
+ << ", avg_num_segments=" << avg_num_segments
+ << ", min_num_segments=" << min_num_segments
+ << ", max_num_segments=" << max_num_segments
+ << ", largest_rowset_size=" << max_size
+ << ", largest_rowset_key=" <<
hex(max_rowset_meta->first)
+ << ", largest_rowset_value="
+ << max_rowset_meta->second.ShortDebugString();
}
}
- if (max_rowset_meta) {
- LOG(WARNING) << "failed to commit kv txn with sub txn"
- << ", err=" << err << ", txn_id=" << txn_id
- << ", total_rowsets=" << rowsets.size()
- << ", avg_num_segments=" << avg_num_segments
- << ", min_num_segments=" << min_num_segments
- << ", max_num_segments=" << max_num_segments
- << ", largest_rowset_size=" << max_size
- << ", largest_rowset_key=" <<
hex(max_rowset_meta->first)
- << ", largest_rowset_value="
- << max_rowset_meta->second.ShortDebugString();
- }
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id <<
" err=" << err;
+ msg = ss.str();
+ return;
}
- code = cast_as<ErrCategory::COMMIT>(err);
- ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << "
err=" << err;
- msg = ss.str();
- return;
- }
- // calculate table stats from tablets stats
- std::map<int64_t /*table_id*/, TableStats> table_stats;
- std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
- request->base_tablet_ids().end());
- calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
- for (const auto& pair : table_stats) {
- TableStatsPB* stats_pb = response->add_table_stats();
- auto table_id = pair.first;
- auto stats = pair.second;
- get_pb_from_tablestats(stats, stats_pb);
- stats_pb->set_table_id(table_id);
- VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
- << " table_id=" << table_id
- << " updated_row_count=" << stats_pb->updated_row_count();
- }
+ // calculate table stats from tablets stats
+ std::map<int64_t /*table_id*/, TableStats> table_stats;
+ std::vector<int64_t>
base_tablet_ids(request->base_tablet_ids().begin(),
+ request->base_tablet_ids().end());
+ calc_table_stats(tablet_ids, tablet_stats, table_stats,
base_tablet_ids);
+ for (const auto& pair : table_stats) {
+ TableStatsPB* stats_pb = response->add_table_stats();
+ auto table_id = pair.first;
+ auto stats = pair.second;
+ get_pb_from_tablestats(stats, stats_pb);
+ stats_pb->set_table_id(table_id);
+ VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" <<
txn_id
+ << " table_id=" << table_id
+ << " updated_row_count=" <<
stats_pb->updated_row_count();
+ }
- response->mutable_txn_info()->CopyFrom(txn_info);
+ response->mutable_txn_info()->CopyFrom(txn_info);
+ TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::finish", &code);
+ break;
+ } while (true);
} // end commit_txn_with_sub_txn
static bool force_txn_lazy_commit() {
@@ -2614,7 +2662,8 @@ void
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
RPC_RATE_LIMIT(commit_txn)
if (request->has_is_txn_load() && request->is_txn_load()) {
- commit_txn_with_sub_txn(request, response, txn_kv_, code, msg,
instance_id, stats);
+ commit_txn_with_sub_txn(request, response, txn_kv_,
txn_lazy_committer_, code, msg,
+ instance_id, stats);
return;
}
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index c4e67b2ef01..795f2f21cb7 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -451,6 +451,7 @@ void TxnLazyCommitTask::commit() {
<< " txn_id=" << txn_id_;
}
+ TEST_SYNC_POINT_CALLBACK("TxnLazyCommitter::commit");
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code_ = cast_as<ErrCategory::COMMIT>(err);
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 2e018839e4e..62f959167e6 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -1825,6 +1825,285 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase4Test) {
ASSERT_EQ(txn_id, txn_info_pb.txn_id());
}
+TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase5Test) {
+ //
===========================================================================
+ // threads concurrent execution flow:
+ //
+ // thread1 thread2
+ // | |
+ // commit_txn_eventually begin commit_txn_with_sub_txn begin
+ // | |
+ // lazy commit wait |
+ // | |
+ // | advance last txn
+ // | |
+ // | finish
+ // | |
+ // finish |
+ // | |
+ // | |
+ // v v
+
+ auto txn_kv = get_mem_txn_kv();
+ int64_t db_id = 134179142;
+ int64_t table_id = 3243264;
+ int64_t index_id = 8098394;
+ int64_t partition_id = 32895361;
+
+ std::mutex go_mutex;
+ std::condition_variable go_cv;
+ bool go = false;
+
+ std::atomic<int32_t> commit_txn_immediately_begin_count = {0};
+ std::atomic<int32_t> last_pending_txn_id_count = {0};
+ std::atomic<int32_t> txn_lazy_committer_wait_count = {0};
+ std::atomic<int32_t> immediately_finish_count = {0};
+ std::atomic<int32_t> eventually_finish_count = {0};
+
+ auto sp = SyncPoint::get_instance();
+
+ int64_t first_txn_id = 0;
+ sp->set_call_back("commit_txn_with_sub_txn:begin", [&](auto&& args) {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ commit_txn_immediately_begin_count++;
+ if (commit_txn_immediately_begin_count == 1) {
+ {
+ first_txn_id = *try_any_cast<int64_t*>(args[0]);
+ go_cv.wait(_lock, [&] { return txn_lazy_committer_wait_count
== 1; });
+ go_cv.notify_all();
+ }
+ }
+ });
+
+ int64_t second_txn_id = 0;
+ sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait",
[&](auto&& args) {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ txn_lazy_committer_wait_count++;
+ if (txn_lazy_committer_wait_count == 1) {
+ int64_t txn_id = *try_any_cast<int64_t*>(args[0]);
+ second_txn_id = txn_id;
+ go_cv.notify_all();
+ }
+ });
+
+ sp->set_call_back("commit_txn_with_sub_txn::advance_last_pending_txn_id",
[&](auto&& args) {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ last_pending_txn_id_count++;
+ if (last_pending_txn_id_count == 1) {
+ int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]);
+ ASSERT_EQ(last_pending_txn_id, second_txn_id);
+ }
+ go_cv.notify_all();
+ });
+
+ sp->set_call_back("commit_txn_with_sub_txn::finish", [&](auto&& args) {
+ MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+ ASSERT_EQ(code, MetaServiceCode::OK);
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ immediately_finish_count++;
+ if (immediately_finish_count == 1) {
+ go_cv.notify_all();
+ }
+ });
+
+ sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) {
+ MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+ ASSERT_EQ(code, MetaServiceCode::OK);
+ eventually_finish_count++;
+ });
+
+ sp->set_call_back("TxnLazyCommitter::commit", [&](auto&& args) {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ go_cv.wait(_lock, [&] { return last_pending_txn_id_count == 1; });
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ // mock rowset and tablet
+ int64_t tablet_id_base = 1908562;
+ for (int i = 0; i < 10; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ }
+
+ int64_t txn_id1 = 0;
+ std::thread thread1([&] {
+ {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ go_cv.wait(_lock, [&] { return go; });
+ }
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+
txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually3442");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id1 = res.txn_id();
+ ASSERT_GT(txn_id1, 0);
+ }
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset =
+ create_rowset(txn_id1, tablet_id_base + i, index_id,
partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id1);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ });
+
+ int64_t txn_id2 = 0;
+ std::thread thread2([&] {
+ {
+ std::unique_lock<std::mutex> _lock(go_mutex);
+ go_cv.wait(_lock, [&] { return go; });
+ }
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+
txn_info_pb.set_label("test_label_concurrent_commit_txn_eventually5");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id2 = res.txn_id();
+ ASSERT_GT(txn_id2, 0);
+ }
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset =
+ create_rowset(txn_id2, tablet_id_base + i, index_id,
partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+ int64_t sub_txn_id1 = txn_id2;
+
+ // begin sub_txn1
+ int64_t sub_txn_id2 = -1;
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id2);
+ req.set_sub_txn_num(0);
+ req.set_db_id(db_id);
+ req.set_label("test_label_concurrent_commit_txn_eventually5_sub");
+ req.mutable_table_ids()->Add(table_id);
+ req.mutable_table_ids()->Add(table_id);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id2 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+ }
+ {
+ for (int i = 0; i < 10; ++i) {
+ auto tmp_rowset =
+ create_rowset(sub_txn_id2, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id2);
+ req.set_is_txn_load(true);
+
+ SubTxnInfo sub_txn_info1;
+ sub_txn_info1.set_sub_txn_id(sub_txn_id1);
+ sub_txn_info1.set_table_id(table_id);
+ for (int i = 0; i < 10; ++i) {
+ sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base +
i);
+ }
+
+ SubTxnInfo sub_txn_info2;
+ sub_txn_info2.set_sub_txn_id(sub_txn_id2);
+ sub_txn_info2.set_table_id(table_id);
+ for (int i = 0; i < 10; ++i) {
+ sub_txn_info1.mutable_base_tablet_ids()->Add(tablet_id_base +
i);
+ }
+
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1));
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2));
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ });
+
+ std::unique_lock<std::mutex> go_lock(go_mutex);
+ go = true;
+ go_lock.unlock();
+ go_cv.notify_all();
+
+ thread1.join();
+ thread2.join();
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ ASSERT_EQ(commit_txn_immediately_begin_count, 2);
+ ASSERT_EQ(last_pending_txn_id_count, 1);
+ ASSERT_EQ(immediately_finish_count, 1);
+ ASSERT_EQ(eventually_finish_count, 1);
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ for (int i = 0; i < 10; ++i) {
+ int64_t tablet_id = tablet_id_base + i;
+ check_tablet_idx_db_id(txn, db_id, tablet_id);
+
+ check_tmp_rowset_not_exist(txn, tablet_id, first_txn_id);
+ check_rowset_meta_exist(txn, tablet_id, 2);
+
+ check_tmp_rowset_not_exist(txn, tablet_id, second_txn_id);
+ check_rowset_meta_exist(txn, tablet_id, 4);
+ }
+ }
+}
+
TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
auto txn_kv = get_mem_txn_kv();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]