This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8b6044bb77d branch-4.1: [fix](cloud) Validate recycle rowset key state
during commit rowset #63985 (#64287)
8b6044bb77d is described below
commit 8b6044bb77dcce64bcf64698dec1bd8f06919af4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 10 11:40:53 2026 +0800
branch-4.1: [fix](cloud) Validate recycle rowset key state during commit
rowset #63985 (#64287)
Cherry-picked from #63985
Co-authored-by: Yixuan Wang <[email protected]>
---
cloud/src/meta-service/meta_service.cpp | 125 +++++++++++++++++++-------------
cloud/test/meta_service_test.cpp | 87 +++++++++++++++++++++-
2 files changed, 159 insertions(+), 53 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index c6f423381a9..2fe48980f6f 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2614,73 +2614,45 @@ void
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
}
}
-// Check recycle_rowset_key to ensure idempotency for commit_rowset operation.
-// The precondition for commit_rowset is that prepare_rowset has been
successfully executed,
-// which creates the recycle_rowset_key. Therefore, we only need to check if
the
-// recycle_rowset_key exists to determine if this is a duplicate request:
-// - If key not found: commit_rowset has already been executed and remove the
key,
-// this is a duplicate request and should be rejected.
-// - If key exists but marked as recycled: the rowset data has been recycled
by recycler,
-// this request should be rejected to prevent data inconsistency.
-// - If key exists and not marked: this is a valid commit_rowset request,
proceed normally.
-// Note: We don't need to check txn/job status separately because
prepare_rowset has already
-// validated them, and the existence of recycle_rowset_key is sufficient to
guarantee idempotency.
-int check_idempotent_for_txn_or_job(Transaction* txn, const std::string&
recycle_rs_key,
- doris::RowsetMetaCloudPB& rowset_meta,
- const std::string& instance_id, int64_t
tablet_id,
- const std::string& rowset_id, const
std::string& tablet_job_id,
- bool is_versioned_read, ResourceManager*
resource_mgr,
- MetaServiceCode& code, std::string& msg) {
- if (!rowset_meta.has_delete_predicate() &&
config::enable_recycle_delete_rowset_key_check) {
+// Returns false only when reading or parsing the recycle rowset key fails, or
when the rowset has
+// already been marked as recycled. recycle_rs_key_exists tells the caller
whether the key exists,
+bool check_recycle_rowset_key(Transaction* txn, const std::string&
recycle_rs_key,
+ const doris::RowsetMetaCloudPB& rowset_meta,
+ bool* recycle_rs_key_exists, MetaServiceCode&
code,
+ std::string& msg) {
+ *recycle_rs_key_exists = false;
+ if (!rowset_meta.has_delete_predicate()) {
std::string recycle_rs_val;
TxnErrorCode err = txn->get(recycle_rs_key, &recycle_rs_val);
if (err == TxnErrorCode::TXN_OK) {
+ *recycle_rs_key_exists = true;
RecycleRowsetPB recycle_rowset;
if (!recycle_rowset.ParseFromString(recycle_rs_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("malformed recycle rowset value. key={}",
hex(recycle_rs_key));
- return 1;
+ return false;
}
auto rs_meta = recycle_rowset.rowset_meta();
if (rs_meta.has_is_recycled() && rs_meta.is_recycled()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("rowset has already been marked as recycled,
key={}, rs_meta={}",
hex(recycle_rs_key),
rs_meta.ShortDebugString());
- return 1;
+ return false;
}
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = fmt::format("recycle rowset key not found, key={}",
hex(recycle_rs_key));
- return 1;
+ return true;
} else {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get recycle rowset, err={}, key={}",
err,
hex(recycle_rs_key));
- return -1;
- }
- } else if (!config::enable_recycle_delete_rowset_key_check) {
- if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
- if (!check_job_existed(txn, code, msg, instance_id, tablet_id,
rowset_id, tablet_job_id,
- is_versioned_read, resource_mgr)) {
- return 1;
- }
- }
-
- // Check if the commit rowset request is invalid.
- // If the transaction has been finished, it means this commit rowset
is a timeout retry request.
- // In this case, do not write the recycle key again, otherwise it may
cause data loss.
- // If the rowset had load id, it means it is a load request, otherwise
it is a
- // compaction/sc request.
- if (config::enable_load_txn_status_check && rowset_meta.has_load_id()
&&
- !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn,
instance_id,
- rowset_meta.txn_id(), code, msg)) {
- LOG(WARNING) << "commit rowset failed, txn_id=" <<
rowset_meta.txn_id()
- << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
- << ", rowset_state=" << rowset_meta.rowset_state() <<
", msg=" << msg;
- return 1;
+ return false;
}
+ } else {
+ // Old BE does not write recycle rowset keys for delete predicate
rowsets during
+ // commit_rowset, so treat the key as existing for compatibility.
+ *recycle_rs_key_exists = true;
}
- return 0;
+ return true;
}
/**
@@ -2732,16 +2704,54 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
bool is_versioned_read = is_version_read_enabled(instance_id);
auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id,
rowset_id});
- if (check_idempotent_for_txn_or_job(txn.get(), recycle_rs_key,
rowset_meta, instance_id,
- tablet_id, rowset_id,
request->tablet_job_id(),
- is_versioned_read,
resource_mgr_.get(), code, msg) != 0) {
- return;
+ if (!config::enable_recycle_delete_rowset_key_check) {
+ std::string tablet_job_id = request->tablet_job_id();
+ if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
+ if (!check_job_existed(txn.get(), code, msg, instance_id,
tablet_id, rowset_id,
+ tablet_job_id, is_versioned_read,
resource_mgr_.get())) {
+ return;
+ }
+ }
+
+ // Check if the commit rowset request is invalid.
+ // If the transaction has been finished, it means this commit rowset
is a timeout retry request.
+ // In this case, do not write the recycle key again, otherwise it may
cause data loss.
+ // If the rowset had load id, it means it is a load request, otherwise
it is a
+ // compaction/sc request.
+ if (config::enable_load_txn_status_check && rowset_meta.has_load_id()
&&
+ !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED,
txn.get(), instance_id,
+ rowset_meta.txn_id(), code, msg)) {
+ LOG(WARNING) << "commit rowset failed, txn_id=" <<
rowset_meta.txn_id()
+ << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
+ << ", rowset_state=" << rowset_meta.rowset_state() <<
", msg=" << msg;
+ return;
+ }
}
// Check if commit key already exists.
std::string existed_commit_val;
err = txn->get(tmp_rs_key, &existed_commit_val);
if (err == TxnErrorCode::TXN_OK) {
+ if (config::enable_recycle_delete_rowset_key_check) {
+ bool recycle_rs_key_exists = false;
+ if (!check_recycle_rowset_key(txn.get(), recycle_rs_key,
rowset_meta,
+ &recycle_rs_key_exists, code, msg)) {
+ return;
+ }
+ if (recycle_rs_key_exists) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format(
+ "tmp rowset key and recycle rowset key are mutually
exclusive, "
+ "tmp_rs_key={}, recycle_rs_key={}",
+ hex(tmp_rs_key), hex(recycle_rs_key));
+ LOG(INFO) << "skip commit rowset because tmp rowset key and
recycle rowset key are "
+ "mutually exclusive, txn_id="
+ << rowset_meta.txn_id() << ", tablet_id=" <<
tablet_id
+ << ", rowset_id=" << rowset_id << ", tmp_rs_key=" <<
hex(tmp_rs_key)
+ << ", recycle_rs_key=" << hex(recycle_rs_key) << ",
msg=" << msg;
+ return;
+ }
+ }
auto existed_rowset_meta = response->mutable_existed_rowset_meta();
if (!existed_rowset_meta->ParseFromString(existed_commit_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
@@ -2800,6 +2810,21 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
msg = fmt::format("failed to check whether rowset exists, err={}",
err);
return;
}
+ if (config::enable_recycle_delete_rowset_key_check) {
+ bool recycle_rs_key_exists = false;
+ if (!check_recycle_rowset_key(txn.get(), recycle_rs_key, rowset_meta,
+ &recycle_rs_key_exists, code, msg)) {
+ return;
+ }
+ if (!recycle_rs_key_exists) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("recycle rowset key not found, key={}",
hex(recycle_rs_key));
+ LOG(INFO) << "skip commit rowset because recycle rowset key does
not exist, txn_id="
+ << rowset_meta.txn_id() << ", tablet_id=" << tablet_id
+ << ", rowset_id=" << rowset_id << ", recycle_rs_key=" <<
hex(recycle_rs_key);
+ return;
+ }
+ }
// write schema kv if rowset_meta has schema
if (config::write_schema_kv && rowset_meta.has_tablet_schema()) {
if (!rowset_meta.has_index_id() && !is_versioned_read) {
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index a158629efd1..afc76d01fc6 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10977,9 +10977,7 @@ TEST(MetaServiceTest, StaleCommitRowset) {
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
- ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") !=
std::string::npos)
- << res.status().msg();
- ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
commit_txn(meta_service.get(), db_id, txn_id, label);
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
@@ -10988,6 +10986,89 @@ TEST(MetaServiceTest, StaleCommitRowset) {
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
}
+TEST(MetaServiceTest, CommitRowsetCheckTmpAndRecycleKeyExclusion) {
+ auto meta_service = get_meta_service();
+
+ const bool old_enable_recycle_delete_rowset_key_check =
+ config::enable_recycle_delete_rowset_key_check;
+ config::enable_recycle_delete_rowset_key_check = true;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycle_delete_rowset_key_check =
old_enable_recycle_delete_rowset_key_check;
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+
+ std::string instance_id = "commit_rowset_recycle_key_test_instance_id";
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ auto put_recycle_rowset = [&](const doris::RowsetMetaCloudPB& rowset) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ RecycleRowsetPB recycle_rowset;
+ recycle_rowset.mutable_rowset_meta()->CopyFrom(rowset);
+ recycle_rowset.set_type(RecycleRowsetPB::PREPARE);
+ std::string recycle_rs_val;
+ ASSERT_TRUE(recycle_rowset.SerializeToString(&recycle_rs_val));
+ txn->put(recycle_rowset_key({instance_id, rowset.tablet_id(),
rowset.rowset_id_v2()}),
+ recycle_rs_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ };
+
+ {
+ int64_t table_id = 1;
+ int64_t partition_id = 1;
+ int64_t tablet_id = 1;
+ int64_t db_id = 100201;
+ std::string label = "test_commit_rowset_tmp_and_recycle_key_conflict";
+ create_tablet(meta_service.get(), table_id, 1, partition_id,
tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(123);
+ rowset.mutable_load_id()->set_lo(456);
+ ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+
+ ASSERT_NO_FATAL_FAILURE(put_recycle_rowset(rowset));
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().msg();
+ ASSERT_TRUE(res.status().msg().find("mutually exclusive") !=
std::string::npos)
+ << res.status().msg();
+ }
+
+ {
+ int64_t table_id = 2;
+ int64_t partition_id = 2;
+ int64_t tablet_id = 2;
+ int64_t db_id = 100202;
+ std::string label = "test_commit_rowset_without_tmp_or_recycle_key";
+ create_tablet(meta_service.get(), table_id, 1, partition_id,
tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(789);
+ rowset.mutable_load_id()->set_lo(101112);
+
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().msg();
+ ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") !=
std::string::npos)
+ << res.status().msg();
+ }
+}
+
TEST(MetaServiceTest, AlterObjInfoTest) {
auto meta_service = get_meta_service();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]