This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4bb71395a8e [fix](cloud) Fix tablet_id is zero when txn lazy commit
(#42043)
4bb71395a8e is described below
commit 4bb71395a8eef5e27a9db5c924a01d0c346c439d
Author: Lei Zhang <[email protected]>
AuthorDate: Mon Oct 28 15:24:35 2024 +0800
[fix](cloud) Fix tablet_id is zero when txn lazy commit (#42043)
* when recycler and ms convert tmp rowsets at the same times, and finish
convert task with different partitions severally, may cause tablet_id
variable is zero
---
cloud/src/meta-service/meta_service_txn.cpp | 4 +-
cloud/src/meta-service/txn_lazy_committer.cpp | 60 +++++++++++++++++++++------
2 files changed, 50 insertions(+), 14 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index b372c360289..03cb7866e1a 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1122,7 +1122,7 @@ void commit_txn_immediately(
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" <<
last_pending_txn_id
- << " code=" << code << "msg=" << msg;
+ << " code=" << code << " msg=" << msg;
return;
}
last_pending_txn_id = 0;
@@ -1654,7 +1654,7 @@ void commit_txn_eventually(
std::tie(code, msg) = task->wait();
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn=" <<
last_pending_txn_id
- << " code=" << code << "msg=" << msg;
+ << " code=" << code << " msg=" << msg;
return;
}
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 94baa217024..fe13f7f8352 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -303,16 +303,17 @@ void TxnLazyCommitTask::commit() {
code_ = MetaServiceCode::OK;
msg_.clear();
int64_t db_id;
- std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
tmp_rowset_metas;
- scan_tmp_rowset(instance_id_, txn_id_, txn_kv_, code_, msg_,
&db_id, &tmp_rowset_metas);
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
all_tmp_rowset_metas;
+ scan_tmp_rowset(instance_id_, txn_id_, txn_kv_, code_, msg_,
&db_id,
+ &all_tmp_rowset_metas);
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_
<< " code=" << code_;
break;
}
VLOG_DEBUG << "txn_id=" << txn_id_
- << " tmp_rowset_metas.size()=" <<
tmp_rowset_metas.size();
- if (tmp_rowset_metas.size() == 0) {
+ << " tmp_rowset_metas.size()=" <<
all_tmp_rowset_metas.size();
+ if (all_tmp_rowset_metas.size() == 0) {
LOG(INFO) << "empty tmp_rowset_metas, txn_id=" << txn_id_;
}
@@ -320,7 +321,7 @@ void TxnLazyCommitTask::commit() {
std::unordered_map<int64_t,
std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>>
partition_to_tmp_rowset_metas;
- for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
+ for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas)
{
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
tmp_rowset_key;
@@ -346,7 +347,6 @@ void TxnLazyCommitTask::commit() {
}
if (code_ != MetaServiceCode::OK) break;
- DCHECK(tmp_rowset_metas.size() > 0);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -358,13 +358,39 @@ void TxnLazyCommitTask::commit() {
}
int64_t table_id = -1;
- for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas)
{
- if (table_id <= 0) {
- table_id =
tablet_ids[tmp_rowset_pb.tablet_id()].table_id();
+ DCHECK(tmp_rowset_metas.size() > 0);
+ if (table_id <= 0) {
+ if (tablet_ids.size() > 0) {
+ // get table_id from memory cache
+ table_id = tablet_ids.begin()->second.table_id();
+ } else {
+ // get table_id from storage
+ int64_t first_tablet_id =
tmp_rowset_metas.begin()->second.tablet_id();
+ std::string tablet_idx_key =
+ meta_tablet_idx_key({instance_id_,
first_tablet_id});
+ std::string tablet_idx_val;
+ err = txn->get(tablet_idx_key, &tablet_idx_val, true);
+ if (TxnErrorCode::TXN_OK != err) {
+ code_ = err == TxnErrorCode::TXN_KEY_NOT_FOUND
+ ? MetaServiceCode::TXN_ID_NOT_FOUND
+ : cast_as<ErrCategory::READ>(err);
+ ss << "failed to get tablet idx, txn_id=" <<
txn_id_
+ << " key=" << hex(tablet_idx_key) << " err=" <<
err;
+ msg_ = ss.str();
+ LOG(WARNING) << msg_;
+ break;
+ }
+
+ TabletIndexPB tablet_idx_pb;
+ if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) {
+ code_ = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse tablet idx pb txn_id=" <<
txn_id_
+ << " key=" << hex(tablet_idx_key);
+ msg_ = ss.str();
+ break;
+ }
+ table_id = tablet_idx_pb.table_id();
}
- txn->remove(tmp_rowset_key);
- LOG(INFO) << "remove tmp_rowset_key=" <<
hex(tmp_rowset_key)
- << " txn_id=" << txn_id_;
}
DCHECK(table_id > 0);
@@ -415,6 +441,12 @@ void TxnLazyCommitTask::commit() {
LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id="
<< txn_id_
<< " version_pb=" <<
version_pb.ShortDebugString();
+ for (auto& [tmp_rowset_key, tmp_rowset_pb] :
tmp_rowset_metas) {
+ txn->remove(tmp_rowset_key);
+ LOG(INFO) << "remove tmp_rowset_key=" <<
hex(tmp_rowset_key)
+ << " txn_id=" << txn_id_;
+ }
+
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code_ = cast_as<ErrCategory::COMMIT>(err);
@@ -424,6 +456,10 @@ void TxnLazyCommitTask::commit() {
}
}
}
+ if (code_ != MetaServiceCode::OK) {
+ LOG(WARNING) << "txn_id=" << txn_id_ << " code=" << code_ << "
msg=" << msg_;
+ break;
+ }
make_committed_txn_visible(instance_id_, db_id, txn_id_, txn_kv_,
code_, msg_);
} while (false);
} while (code_ == MetaServiceCode::KV_TXN_CONFLICT &&
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]