SWJTU-ZhangLei commented on code in PR #54653:
URL: https://github.com/apache/doris/pull/54653#discussion_r2275265498
##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -2060,538 +2060,435 @@ void MetaServiceImpl::commit_txn_eventually(
void MetaServiceImpl::commit_txn_with_sub_txn(const CommitTxnRequest* request,
CommitTxnResponse* response,
MetaServiceCode& code,
std::string& msg, const
std::string& instance_id,
- KVStats& stats) {
+ int64_t db_id, KVStats& stats) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
auto sub_txn_infos = request->sub_txn_infos();
- // Create a readonly txn for scan tmp rowset
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::CREATE>(err);
- ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
- DORIS_CLOUD_DEFER {
- if (txn == nullptr) return;
- stats.get_bytes += txn->get_bytes();
- stats.put_bytes += txn->put_bytes();
- stats.del_bytes += txn->delete_bytes();
- stats.get_counter += txn->num_get_keys();
- stats.put_counter += txn->num_put_keys();
- stats.del_counter += txn->num_del_keys();
- };
-
- // Get db id with txn id
- std::string index_val;
- const std::string index_key = txn_index_key({instance_id, txn_id});
- err = txn->get(index_key, &index_val);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
-
- TxnIndexPB index_pb;
- if (!index_pb.ParseFromString(index_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
-
- DCHECK(index_pb.has_tablet_index() == true);
- DCHECK(index_pb.tablet_index().has_db_id() == true);
- int64_t db_id = index_pb.tablet_index().db_id();
-
- // Get temporary rowsets involved in the txn
std::map<int64_t, std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>>
sub_txn_to_tmp_rowsets_meta;
for (const auto& sub_txn_info : sub_txn_infos) {
auto sub_txn_id = sub_txn_info.sub_txn_id();
- // This is a range scan
- MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0};
- MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0};
- std::string rs_tmp_key0;
- std::string rs_tmp_key1;
- meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
- meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
- // Get rowset meta that should be commited
- // tmp_rowset_key -> rowset_meta
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
tmp_rowsets_meta;
-
- int num_rowsets = 0;
- DORIS_CLOUD_DEFER_COPY(rs_tmp_key_info0, rs_tmp_key_info1) {
- LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id << ",
sub_txn_id=" << sub_txn_id
- << " num_rowsets=" << num_rowsets << " range=[" <<
hex(rs_tmp_key0) << ","
- << hex(rs_tmp_key1) << ")";
+ scan_tmp_rowset(instance_id, sub_txn_id, txn_kv_, code, msg,
&tmp_rowsets_meta, &stats);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id
+ << ", sub_txn_id=" << sub_txn_id << " code=" << code;
+ return;
+ }
+ sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id,
std::move(tmp_rowsets_meta));
+ }
+ do {
+ // Create a readonly txn for scan tmp rowset
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ DORIS_CLOUD_DEFER {
+ if (txn == nullptr) return;
+ stats.get_bytes += txn->get_bytes();
+ stats.put_bytes += txn->put_bytes();
+ stats.del_bytes += txn->delete_bytes();
+ stats.get_counter += txn->num_get_keys();
+ stats.put_counter += txn->num_put_keys();
+ stats.del_counter += txn->num_del_keys();
};
- std::unique_ptr<RangeGetIterator> it;
- do {
- err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
- if (err == TxnErrorCode::TXN_TOO_OLD) {
- err = txn_kv_->create_txn(&txn);
- if (err == TxnErrorCode::TXN_OK) {
- err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
- }
- }
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::READ>(err);
- ss << "internal error, failed to get tmp rowset while
committing, txn_id=" << txn_id
+ // Get txn info with db_id and txn_id
+ std::string info_val; // Will be reused when saving updated txn
+ const std::string info_key = txn_info_key({instance_id, db_id,
txn_id});
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ ss << "transaction [" << txn_id << "] not found, db_id=" <<
db_id;
+ } else {
+ ss << "failed to get txn_info, db_id=" << db_id << " txn_id="
<< txn_id
<< " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
}
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
- while (it->has_next()) {
- auto [k, v] = it->next();
- LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << "
txn_id=" << txn_id;
- tmp_rowsets_meta.emplace_back();
- if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(),
v.size())) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "malformed rowset meta, unable to initialize,
txn_id=" << txn_id
- << " key=" << hex(k);
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" <<
txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ // TODO: do more check like txn state
+ DCHECK(txn_info.txn_id() == txn_id);
+ if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+ code = MetaServiceCode::TXN_ALREADY_ABORTED;
+ ss << "transaction is already aborted: db_id=" << db_id << "
txn_id=" << txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+ code = MetaServiceCode::OK;
+ ss << "transaction is already visible: db_id=" << db_id << "
txn_id=" << txn_id;
+ msg = ss.str();
+ LOG(INFO) << msg;
+ response->mutable_txn_info()->CopyFrom(txn_info);
+ return;
+ }
+
+ LOG(INFO) << "txn_id=" << txn_id << " txn_info=" <<
txn_info.ShortDebugString();
+
+ AnnotateTag txn_tag("txn_id", txn_id);
+ // Prepare rowset meta and new_versions
+ std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+ {
+ // Read tablet indexes in batch.
+ std::vector<int64_t> acquired_tablet_ids;
+ for (const auto& [_, tmp_rowsets_meta] :
sub_txn_to_tmp_rowsets_meta) {
+ for (const auto& [_, i] : tmp_rowsets_meta) {
+ acquired_tablet_ids.push_back(i.tablet_id());
}
- // Save keys that will be removed later
- tmp_rowsets_meta.back().first = std::string(k.data(),
k.size());
- ++num_rowsets;
- if (!it->has_next()) rs_tmp_key0 = k;
}
- rs_tmp_key0.push_back('\x00'); // Update to next smallest key for
iteration
- } while (it->more());
+ std::tie(code, msg) =
+ get_tablet_indexes(txn.get(), &tablet_ids, instance_id,
acquired_tablet_ids);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+ }
- VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id
- << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size();
- sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id,
std::move(tmp_rowsets_meta));
- }
- stats.get_bytes += txn->get_bytes();
- stats.get_counter += txn->num_get_keys();
- // Create a read/write txn for guarantee consistency
- txn.reset();
- err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::CREATE>(err);
- ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
+ // {table/partition} -> version
+ std::unordered_map<int64_t, std::tuple<int64_t, int64_t>>
partition_indexes;
+ for (auto& [tablet_id, tablet_idx] : tablet_ids) {
+ int64_t table_id = tablet_idx.table_id();
+ int64_t partition_id = tablet_idx.partition_id();
+ partition_indexes.insert({partition_id, {db_id, table_id}});
+ }
- // Get txn info with db_id and txn_id
- std::string info_val; // Will be reused when saving updated txn
- const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
- err = txn->get(info_key, &info_val);
- if (err != TxnErrorCode::TXN_OK) {
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
- :
cast_as<ErrCategory::READ>(err);
- ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" <<
txn_id << " err=" << err;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
+ int64_t last_pending_txn_id = 0;
+ std::unordered_map<int64_t, int64_t> new_versions;
+ std::tie(code, msg) = get_partition_versions(txn.get(), &new_versions,
&last_pending_txn_id,
+ instance_id,
partition_indexes);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
- TxnInfoPB txn_info;
- if (!txn_info.ParseFromString(info_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" <<
txn_id;
- msg = ss.str();
- LOG(WARNING) << msg;
- return;
- }
+ if (last_pending_txn_id > 0) {
+ stats.get_bytes += txn->get_bytes();
+ stats.get_counter += txn->num_get_keys();
+ txn.reset();
+ std::shared_ptr<TxnLazyCommitTask> task =
Review Comment:
Could you add a ut to cover the logical ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]