This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fa91940f8b7 [fix](cloud) Validate recycle rowset key state during
commit rowset (#63985)
fa91940f8b7 is described below
commit fa91940f8b7d36ea0c28af0acec1446624472a80
Author: Yixuan Wang <[email protected]>
AuthorDate: Tue Jun 9 15:52:28 2026 +0800
[fix](cloud) Validate recycle rowset key state during commit rowset (#63985)
### What problem does this PR solve?
Problem Summary:
`commit_rowset` needs to distinguish two key states to preserve
idempotency and prevent invalid commits:
1. If the tmp rowset key already exists, the recycle rowset key must not
exist. Otherwise, the two keys are mutually exclusive and the retry
should fail.
2. If the tmp rowset key does not exist, the recycle rowset key must
exist before creating the tmp rowset key. Otherwise, the request may
bypass the prepare-rowset state and commit an invalid rowset.
This change reorganizes the recycle rowset key check in `commit_rowset`
to first check the tmp rowset key, then validate the recycle rowset key
according to the tmp key state. It also simplifies the helper by
returning whether the recycle key check succeeds and exposing whether
the recycle key exists through an output parameter.
A unit test is added to cover both invalid states:
- tmp rowset key exists while recycle rowset key also exists
- tmp rowset key does not exist while recycle rowset key is missing
---
cloud/src/meta-service/meta_service.cpp | 125 +++++++++++++++++++-------------
cloud/test/meta_service_test.cpp | 87 +++++++++++++++++++++-
2 files changed, 159 insertions(+), 53 deletions(-)
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index c6f423381a9..2fe48980f6f 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2614,73 +2614,45 @@ void
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
}
}
-// Check recycle_rowset_key to ensure idempotency for commit_rowset operation.
-// The precondition for commit_rowset is that prepare_rowset has been
successfully executed,
-// which creates the recycle_rowset_key. Therefore, we only need to check if
the
-// recycle_rowset_key exists to determine if this is a duplicate request:
-// - If key not found: commit_rowset has already been executed and remove the
key,
-// this is a duplicate request and should be rejected.
-// - If key exists but marked as recycled: the rowset data has been recycled
by recycler,
-// this request should be rejected to prevent data inconsistency.
-// - If key exists and not marked: this is a valid commit_rowset request,
proceed normally.
-// Note: We don't need to check txn/job status separately because
prepare_rowset has already
-// validated them, and the existence of recycle_rowset_key is sufficient to
guarantee idempotency.
-int check_idempotent_for_txn_or_job(Transaction* txn, const std::string&
recycle_rs_key,
- doris::RowsetMetaCloudPB& rowset_meta,
- const std::string& instance_id, int64_t
tablet_id,
- const std::string& rowset_id, const
std::string& tablet_job_id,
- bool is_versioned_read, ResourceManager*
resource_mgr,
- MetaServiceCode& code, std::string& msg) {
- if (!rowset_meta.has_delete_predicate() &&
config::enable_recycle_delete_rowset_key_check) {
+// Returns false only when reading or parsing the recycle rowset key fails, or
when the rowset has
+// already been marked as recycled. recycle_rs_key_exists tells the caller
whether the key exists,
+bool check_recycle_rowset_key(Transaction* txn, const std::string&
recycle_rs_key,
+ const doris::RowsetMetaCloudPB& rowset_meta,
+ bool* recycle_rs_key_exists, MetaServiceCode&
code,
+ std::string& msg) {
+ *recycle_rs_key_exists = false;
+ if (!rowset_meta.has_delete_predicate()) {
std::string recycle_rs_val;
TxnErrorCode err = txn->get(recycle_rs_key, &recycle_rs_val);
if (err == TxnErrorCode::TXN_OK) {
+ *recycle_rs_key_exists = true;
RecycleRowsetPB recycle_rowset;
if (!recycle_rowset.ParseFromString(recycle_rs_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("malformed recycle rowset value. key={}",
hex(recycle_rs_key));
- return 1;
+ return false;
}
auto rs_meta = recycle_rowset.rowset_meta();
if (rs_meta.has_is_recycled() && rs_meta.is_recycled()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("rowset has already been marked as recycled,
key={}, rs_meta={}",
hex(recycle_rs_key),
rs_meta.ShortDebugString());
- return 1;
+ return false;
}
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = fmt::format("recycle rowset key not found, key={}",
hex(recycle_rs_key));
- return 1;
+ return true;
} else {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get recycle rowset, err={}, key={}",
err,
hex(recycle_rs_key));
- return -1;
- }
- } else if (!config::enable_recycle_delete_rowset_key_check) {
- if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
- if (!check_job_existed(txn, code, msg, instance_id, tablet_id,
rowset_id, tablet_job_id,
- is_versioned_read, resource_mgr)) {
- return 1;
- }
- }
-
- // Check if the commit rowset request is invalid.
- // If the transaction has been finished, it means this commit rowset
is a timeout retry request.
- // In this case, do not write the recycle key again, otherwise it may
cause data loss.
- // If the rowset had load id, it means it is a load request, otherwise
it is a
- // compaction/sc request.
- if (config::enable_load_txn_status_check && rowset_meta.has_load_id()
&&
- !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn,
instance_id,
- rowset_meta.txn_id(), code, msg)) {
- LOG(WARNING) << "commit rowset failed, txn_id=" <<
rowset_meta.txn_id()
- << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
- << ", rowset_state=" << rowset_meta.rowset_state() <<
", msg=" << msg;
- return 1;
+ return false;
}
+ } else {
+ // Old BE does not write recycle rowset keys for delete predicate
rowsets during
+ // commit_rowset, so treat the key as existing for compatibility.
+ *recycle_rs_key_exists = true;
}
- return 0;
+ return true;
}
/**
@@ -2732,16 +2704,54 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
bool is_versioned_read = is_version_read_enabled(instance_id);
auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id,
rowset_id});
- if (check_idempotent_for_txn_or_job(txn.get(), recycle_rs_key,
rowset_meta, instance_id,
- tablet_id, rowset_id,
request->tablet_job_id(),
- is_versioned_read,
resource_mgr_.get(), code, msg) != 0) {
- return;
+ if (!config::enable_recycle_delete_rowset_key_check) {
+ std::string tablet_job_id = request->tablet_job_id();
+ if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
+ if (!check_job_existed(txn.get(), code, msg, instance_id,
tablet_id, rowset_id,
+ tablet_job_id, is_versioned_read,
resource_mgr_.get())) {
+ return;
+ }
+ }
+
+ // Check if the commit rowset request is invalid.
+ // If the transaction has been finished, it means this commit rowset
is a timeout retry request.
+ // In this case, do not write the recycle key again, otherwise it may
cause data loss.
+ // If the rowset had load id, it means it is a load request, otherwise
it is a
+ // compaction/sc request.
+ if (config::enable_load_txn_status_check && rowset_meta.has_load_id()
&&
+ !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED,
txn.get(), instance_id,
+ rowset_meta.txn_id(), code, msg)) {
+ LOG(WARNING) << "commit rowset failed, txn_id=" <<
rowset_meta.txn_id()
+ << ", tablet_id=" << tablet_id << ", rowset_id=" <<
rowset_id
+ << ", rowset_state=" << rowset_meta.rowset_state() <<
", msg=" << msg;
+ return;
+ }
}
// Check if commit key already exists.
std::string existed_commit_val;
err = txn->get(tmp_rs_key, &existed_commit_val);
if (err == TxnErrorCode::TXN_OK) {
+ if (config::enable_recycle_delete_rowset_key_check) {
+ bool recycle_rs_key_exists = false;
+ if (!check_recycle_rowset_key(txn.get(), recycle_rs_key,
rowset_meta,
+ &recycle_rs_key_exists, code, msg)) {
+ return;
+ }
+ if (recycle_rs_key_exists) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format(
+ "tmp rowset key and recycle rowset key are mutually
exclusive, "
+ "tmp_rs_key={}, recycle_rs_key={}",
+ hex(tmp_rs_key), hex(recycle_rs_key));
+ LOG(INFO) << "skip commit rowset because tmp rowset key and
recycle rowset key are "
+ "mutually exclusive, txn_id="
+ << rowset_meta.txn_id() << ", tablet_id=" <<
tablet_id
+ << ", rowset_id=" << rowset_id << ", tmp_rs_key=" <<
hex(tmp_rs_key)
+ << ", recycle_rs_key=" << hex(recycle_rs_key) << ",
msg=" << msg;
+ return;
+ }
+ }
auto existed_rowset_meta = response->mutable_existed_rowset_meta();
if (!existed_rowset_meta->ParseFromString(existed_commit_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
@@ -2800,6 +2810,21 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
msg = fmt::format("failed to check whether rowset exists, err={}",
err);
return;
}
+ if (config::enable_recycle_delete_rowset_key_check) {
+ bool recycle_rs_key_exists = false;
+ if (!check_recycle_rowset_key(txn.get(), recycle_rs_key, rowset_meta,
+ &recycle_rs_key_exists, code, msg)) {
+ return;
+ }
+ if (!recycle_rs_key_exists) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("recycle rowset key not found, key={}",
hex(recycle_rs_key));
+ LOG(INFO) << "skip commit rowset because recycle rowset key does
not exist, txn_id="
+ << rowset_meta.txn_id() << ", tablet_id=" << tablet_id
+ << ", rowset_id=" << rowset_id << ", recycle_rs_key=" <<
hex(recycle_rs_key);
+ return;
+ }
+ }
// write schema kv if rowset_meta has schema
if (config::write_schema_kv && rowset_meta.has_tablet_schema()) {
if (!rowset_meta.has_index_id() && !is_versioned_read) {
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 6f18493ee2e..8b17a385659 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10977,9 +10977,7 @@ TEST(MetaServiceTest, StaleCommitRowset) {
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
- ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") !=
std::string::npos)
- << res.status().msg();
- ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
commit_txn(meta_service.get(), db_id, txn_id, label);
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
@@ -10988,6 +10986,89 @@ TEST(MetaServiceTest, StaleCommitRowset) {
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().code();
}
+TEST(MetaServiceTest, CommitRowsetCheckTmpAndRecycleKeyExclusion) {
+ auto meta_service = get_meta_service();
+
+ const bool old_enable_recycle_delete_rowset_key_check =
+ config::enable_recycle_delete_rowset_key_check;
+ config::enable_recycle_delete_rowset_key_check = true;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycle_delete_rowset_key_check =
old_enable_recycle_delete_rowset_key_check;
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+
+ std::string instance_id = "commit_rowset_recycle_key_test_instance_id";
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ auto put_recycle_rowset = [&](const doris::RowsetMetaCloudPB& rowset) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ RecycleRowsetPB recycle_rowset;
+ recycle_rowset.mutable_rowset_meta()->CopyFrom(rowset);
+ recycle_rowset.set_type(RecycleRowsetPB::PREPARE);
+ std::string recycle_rs_val;
+ ASSERT_TRUE(recycle_rowset.SerializeToString(&recycle_rs_val));
+ txn->put(recycle_rowset_key({instance_id, rowset.tablet_id(),
rowset.rowset_id_v2()}),
+ recycle_rs_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ };
+
+ {
+ int64_t table_id = 1;
+ int64_t partition_id = 1;
+ int64_t tablet_id = 1;
+ int64_t db_id = 100201;
+ std::string label = "test_commit_rowset_tmp_and_recycle_key_conflict";
+ create_tablet(meta_service.get(), table_id, 1, partition_id,
tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(123);
+ rowset.mutable_load_id()->set_lo(456);
+ ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+
+ ASSERT_NO_FATAL_FAILURE(put_recycle_rowset(rowset));
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().msg();
+ ASSERT_TRUE(res.status().msg().find("mutually exclusive") !=
std::string::npos)
+ << res.status().msg();
+ }
+
+ {
+ int64_t table_id = 2;
+ int64_t partition_id = 2;
+ int64_t tablet_id = 2;
+ int64_t db_id = 100202;
+ std::string label = "test_commit_rowset_without_tmp_or_recycle_key";
+ create_tablet(meta_service.get(), table_id, 1, partition_id,
tablet_id);
+
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label,
table_id, txn_id));
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ rowset.mutable_load_id()->set_hi(789);
+ rowset.mutable_load_id()->set_lo(101112);
+
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset,
res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().msg();
+ ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") !=
std::string::npos)
+ << res.status().msg();
+ }
+}
+
TEST(MetaServiceTest, AlterObjInfoTest) {
auto meta_service = get_meta_service();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]