This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0.6
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7bc546c035626dda874ddc39bd0dc241281c64ca
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 10:59:56 2025 +0800
branch-3.0: [fix](cloud) potential data race when retrying prepare/commit
rowset for load #51129 (#51483)
Cherry-picked from #51129
Co-authored-by: Xin Liao <[email protected]>
---
cloud/src/common/config.h | 3 +
cloud/src/meta-service/meta_service.cpp | 97 ++++++++++++++++++++++++++++-
cloud/src/meta-service/meta_service_txn.cpp | 1 +
cloud/test/meta_service_test.cpp | 67 ++++++++++++++++++++
gensrc/proto/cloud.proto | 1 +
5 files changed, 168 insertions(+), 1 deletion(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 1e40c5cfb25..3bb791f71e9 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -219,6 +219,9 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf");
CONF_mBool(enable_distinguish_hdfs_path, "true");
+// If enabled, the txn status will be checked when preapre/commit rowset
+CONF_mBool(enable_load_txn_status_check, "true");
+
// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation,
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 45e64f475f6..d26e2fde1ec 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -991,6 +991,73 @@ static void fill_schema_from_dict(MetaServiceCode& code,
std::string& msg,
existed_rowset_meta->CopyFrom(metas.Get(0));
}
+/**
+* Check if the transaction status is as expected.
+* If the transaction is not in the expected state, return false and set the
error code and message.
+*
+* @param expect_status The expected transaction status.
+* @param txn Pointer to the transaction object.
+* @param instance_id The instance ID associated with the transaction.
+* @param txn_id The transaction ID to check.
+* @param code Reference to the error code to be set in case of failure.
+* @param msg Reference to the error message to be set in case of failure.
+* @return true if the transaction status matches the expected status, false
otherwise.
+ */
+static bool check_transaction_status(TxnStatusPB expect_status, Transaction*
txn,
+ const std::string& instance_id, int64_t
txn_id,
+ MetaServiceCode& code, std::string& msg) {
+ // Get db id with txn id
+ std::string index_val;
+ const std::string index_key = txn_index_key({instance_id, txn_id});
+ TxnErrorCode err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id,
err);
+ return false;
+ }
+
+ TxnIndexPB index_pb;
+ if (!index_pb.ParseFromString(index_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
+ return false;
+ }
+
+ DCHECK(index_pb.has_tablet_index() == true);
+ DCHECK(index_pb.tablet_index().has_db_id() == true);
+ if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
+ LOG(WARNING) << fmt::format(
+ "txn_index_pb is malformed, tablet_index has no db_id,
txn_id={}", txn_id);
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("has no db_id in TxnIndexPB, txn_id={}", txn_id);
+ return false;
+ }
+ auto db_id = index_pb.tablet_index().db_id();
+ txn_id = index_pb.has_parent_txn_id() ? index_pb.parent_txn_id() : txn_id;
+
+ const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ std::string info_val;
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get txn, txn_id={}, err={}", txn_id, err);
+ return false;
+ }
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse txn_info, db_id={} txn_id={}",
db_id, txn_id);
+ return false;
+ }
+ if (txn_info.status() != expect_status) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("txn is not in {} state, txn_id={}, txn_status={}",
expect_status, txn_id,
+ txn_info.status());
+ return false;
+ }
+ return true;
+}
+
/**
* 1. Check and confirm tmp rowset kv does not exist
* 2. Construct recycle rowset kv which contains object path
@@ -1033,6 +1100,20 @@ void
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
return;
}
+ // Check if the prepare rowset request is invalid.
+ // If the transaction has been finished, it means this prepare rowset is a
timeout retry request.
+ // In this case, do not write the recycle key again, otherwise it may
cause data loss.
+ // If the rowset had load id, it means it is a load request, otherwise it
is a
+ // compaction/sc request.
+ if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
+ !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(),
instance_id,
+ rowset_meta.txn_id(), code, msg)) {
+ LOG(WARNING) << "prepare rowset failed, txn_id=" <<
rowset_meta.txn_id()
+ << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
+ << ", rowset_state=" << rowset_meta.rowset_state() << ",
msg=" << msg;
+ return;
+ }
+
// Check if commit key already exists.
std::string val;
err = txn->get(tmp_rs_key, &val);
@@ -1156,6 +1237,20 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
return;
}
+ // Check if the commit rowset request is invalid.
+ // If the transaction has been finished, it means this commit rowset is a
timeout retry request.
+ // In this case, do not write the recycle key again, otherwise it may
cause data loss.
+ // If the rowset has load id, it means it is a load request, otherwise it
is a
+ // compaction/sc request.
+ if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
+ !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(),
instance_id,
+ rowset_meta.txn_id(), code, msg)) {
+ LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id()
+ << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
+ << ", rowset_state=" << rowset_meta.rowset_state() << ",
msg=" << msg;
+ return;
+ }
+
// Check if commit key already exists.
std::string existed_commit_val;
err = txn->get(tmp_rs_key, &existed_commit_val);
@@ -2787,4 +2882,4 @@ void
MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control
response->mutable_schema_dict()->Swap(&schema_dict);
}
-} // namespace doris::cloud
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 21e7dbe1a0c..490c7bb616b 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3190,6 +3190,7 @@ void
MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
std::string index_val;
TxnIndexPB index_pb;
index_pb.mutable_tablet_index()->set_db_id(db_id);
+ index_pb.set_parent_txn_id(txn_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 0e8e543c494..bd593ce783c 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -8930,4 +8930,71 @@ TEST(MetaServiceTest, AddObjInfoWithRole) {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
}
+
+TEST(MetaServiceTest, StalePrepareRowset) {
+ auto meta_service = get_meta_service();
+
+ int64_t table_id = 1;
+ int64_t partition_id = 1;
+ int64_t tablet_id = 1;
+ int64_t db_id = 100201;
+ std::string label = "test_prepare_rowset";
+ create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(123);
+ rowset.mutable_load_id()->set_lo(456);
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_TRUE(res.status().msg().find("rowset already exists") !=
std::string::npos)
+ << res.status().msg();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) <<
res.status().code();
+
+ commit_txn(meta_service.get(), db_id, txn_id, label);
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
+ << res.status().msg();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
+}
+
+TEST(MetaServiceTest, StaleCommitRowset) {
+ auto meta_service = get_meta_service();
+
+ int64_t table_id = 1;
+ int64_t partition_id = 1;
+ int64_t tablet_id = 1;
+ int64_t db_id = 100201;
+ std::string label = "test_prepare_rowset";
+ create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(123);
+ rowset.mutable_load_id()->set_lo(456);
+ prepare_rowset(meta_service.get(), rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+ commit_txn(meta_service.get(), db_id, txn_id, label);
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+ ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
+ << res.status().msg();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
+}
+
} // namespace doris::cloud
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 1b6cdd9bd5a..fa1336a2e7a 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -385,6 +385,7 @@ message TxnLabelPB {
// txn_id -> db_id
message TxnIndexPB {
optional TabletIndexPB tablet_index = 1;
+ optional int64 parent_txn_id = 2;
}
message TxnInfoPB {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]