This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fa91940f8b7 [fix](cloud) Validate recycle rowset key state during 
commit rowset (#63985)
fa91940f8b7 is described below

commit fa91940f8b7d36ea0c28af0acec1446624472a80
Author: Yixuan Wang <[email protected]>
AuthorDate: Tue Jun 9 15:52:28 2026 +0800

    [fix](cloud) Validate recycle rowset key state during commit rowset (#63985)
    
    ### What problem does this PR solve?
    Problem Summary:
    
    `commit_rowset` needs to distinguish two key states to preserve
    idempotency and prevent invalid commits:
    
    1. If the tmp rowset key already exists, the recycle rowset key must not
    exist. Otherwise, the two keys are mutually exclusive and the retry
    should fail.
    2. If the tmp rowset key does not exist, the recycle rowset key must
    exist before creating the tmp rowset key. Otherwise, the request may
    bypass the prepare-rowset state and commit an invalid rowset.
    
    This change reorganizes the recycle rowset key check in `commit_rowset`
    to first check the tmp rowset key, then validate the recycle rowset key
    according to the tmp key state. It also simplifies the helper by
    returning whether the recycle key check succeeds and exposing whether
    the recycle key exists through an output parameter.
    
    A unit test is added to cover both invalid states:
    - tmp rowset key exists while recycle rowset key also exists
    - tmp rowset key does not exist while recycle rowset key is missing
---
 cloud/src/meta-service/meta_service.cpp | 125 +++++++++++++++++++-------------
 cloud/test/meta_service_test.cpp        |  87 +++++++++++++++++++++-
 2 files changed, 159 insertions(+), 53 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index c6f423381a9..2fe48980f6f 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2614,73 +2614,45 @@ void 
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
     }
 }
 
-// Check recycle_rowset_key to ensure idempotency for commit_rowset operation.
-// The precondition for commit_rowset is that prepare_rowset has been 
successfully executed,
-// which creates the recycle_rowset_key. Therefore, we only need to check if 
the
-// recycle_rowset_key exists to determine if this is a duplicate request:
-// - If key not found: commit_rowset has already been executed and remove the 
key,
-//   this is a duplicate request and should be rejected.
-// - If key exists but marked as recycled: the rowset data has been recycled 
by recycler,
-//   this request should be rejected to prevent data inconsistency.
-// - If key exists and not marked: this is a valid commit_rowset request, 
proceed normally.
-// Note: We don't need to check txn/job status separately because 
prepare_rowset has already
-// validated them, and the existence of recycle_rowset_key is sufficient to 
guarantee idempotency.
-int check_idempotent_for_txn_or_job(Transaction* txn, const std::string& 
recycle_rs_key,
-                                    doris::RowsetMetaCloudPB& rowset_meta,
-                                    const std::string& instance_id, int64_t 
tablet_id,
-                                    const std::string& rowset_id, const 
std::string& tablet_job_id,
-                                    bool is_versioned_read, ResourceManager* 
resource_mgr,
-                                    MetaServiceCode& code, std::string& msg) {
-    if (!rowset_meta.has_delete_predicate() && 
config::enable_recycle_delete_rowset_key_check) {
+// Returns false only when reading or parsing the recycle rowset key fails, or 
when the rowset has
+// already been marked as recycled. recycle_rs_key_exists tells the caller 
whether the key exists,
+bool check_recycle_rowset_key(Transaction* txn, const std::string& 
recycle_rs_key,
+                              const doris::RowsetMetaCloudPB& rowset_meta,
+                              bool* recycle_rs_key_exists, MetaServiceCode& 
code,
+                              std::string& msg) {
+    *recycle_rs_key_exists = false;
+    if (!rowset_meta.has_delete_predicate()) {
         std::string recycle_rs_val;
         TxnErrorCode err = txn->get(recycle_rs_key, &recycle_rs_val);
         if (err == TxnErrorCode::TXN_OK) {
+            *recycle_rs_key_exists = true;
             RecycleRowsetPB recycle_rowset;
             if (!recycle_rowset.ParseFromString(recycle_rs_val)) {
                 code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                 msg = fmt::format("malformed recycle rowset value. key={}", 
hex(recycle_rs_key));
-                return 1;
+                return false;
             }
             auto rs_meta = recycle_rowset.rowset_meta();
             if (rs_meta.has_is_recycled() && rs_meta.is_recycled()) {
                 code = MetaServiceCode::INVALID_ARGUMENT;
                 msg = fmt::format("rowset has already been marked as recycled, 
key={}, rs_meta={}",
                                   hex(recycle_rs_key), 
rs_meta.ShortDebugString());
-                return 1;
+                return false;
             }
         } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            code = MetaServiceCode::INVALID_ARGUMENT;
-            msg = fmt::format("recycle rowset key not found, key={}", 
hex(recycle_rs_key));
-            return 1;
+            return true;
         } else {
             code = cast_as<ErrCategory::READ>(err);
             msg = fmt::format("failed to get recycle rowset, err={}, key={}", 
err,
                               hex(recycle_rs_key));
-            return -1;
-        }
-    } else if (!config::enable_recycle_delete_rowset_key_check) {
-        if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
-            if (!check_job_existed(txn, code, msg, instance_id, tablet_id, 
rowset_id, tablet_job_id,
-                                   is_versioned_read, resource_mgr)) {
-                return 1;
-            }
-        }
-
-        // Check if the commit rowset request is invalid.
-        // If the transaction has been finished, it means this commit rowset 
is a timeout retry request.
-        // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
-        // If the rowset had load id, it means it is a load request, otherwise 
it is a
-        // compaction/sc request.
-        if (config::enable_load_txn_status_check && rowset_meta.has_load_id() 
&&
-            !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn, 
instance_id,
-                                      rowset_meta.txn_id(), code, msg)) {
-            LOG(WARNING) << "commit rowset failed, txn_id=" << 
rowset_meta.txn_id()
-                         << ", tablet_id=" << tablet_id << ", rowset_id=" << 
rowset_id
-                         << ", rowset_state=" << rowset_meta.rowset_state() << 
", msg=" << msg;
-            return 1;
+            return false;
         }
+    } else {
+        // Old BE does not write recycle rowset keys for delete predicate 
rowsets during
+        // commit_rowset, so treat the key as existing for compatibility.
+        *recycle_rs_key_exists = true;
     }
-    return 0;
+    return true;
 }
 
 /**
@@ -2732,16 +2704,54 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
     bool is_versioned_read = is_version_read_enabled(instance_id);
     auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id, 
rowset_id});
 
-    if (check_idempotent_for_txn_or_job(txn.get(), recycle_rs_key, 
rowset_meta, instance_id,
-                                        tablet_id, rowset_id, 
request->tablet_job_id(),
-                                        is_versioned_read, 
resource_mgr_.get(), code, msg) != 0) {
-        return;
+    if (!config::enable_recycle_delete_rowset_key_check) {
+        std::string tablet_job_id = request->tablet_job_id();
+        if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
+            if (!check_job_existed(txn.get(), code, msg, instance_id, 
tablet_id, rowset_id,
+                                   tablet_job_id, is_versioned_read, 
resource_mgr_.get())) {
+                return;
+            }
+        }
+
+        // Check if the commit rowset request is invalid.
+        // If the transaction has been finished, it means this commit rowset 
is a timeout retry request.
+        // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
+        // If the rowset had load id, it means it is a load request, otherwise 
it is a
+        // compaction/sc request.
+        if (config::enable_load_txn_status_check && rowset_meta.has_load_id() 
&&
+            !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, 
txn.get(), instance_id,
+                                      rowset_meta.txn_id(), code, msg)) {
+            LOG(WARNING) << "commit rowset failed, txn_id=" << 
rowset_meta.txn_id()
+                         << ", tablet_id=" << tablet_id << ", rowset_id=" << 
rowset_id
+                         << ", rowset_state=" << rowset_meta.rowset_state() << 
", msg=" << msg;
+            return;
+        }
     }
 
     // Check if commit key already exists.
     std::string existed_commit_val;
     err = txn->get(tmp_rs_key, &existed_commit_val);
     if (err == TxnErrorCode::TXN_OK) {
+        if (config::enable_recycle_delete_rowset_key_check) {
+            bool recycle_rs_key_exists = false;
+            if (!check_recycle_rowset_key(txn.get(), recycle_rs_key, 
rowset_meta,
+                                          &recycle_rs_key_exists, code, msg)) {
+                return;
+            }
+            if (recycle_rs_key_exists) {
+                code = MetaServiceCode::INVALID_ARGUMENT;
+                msg = fmt::format(
+                        "tmp rowset key and recycle rowset key are mutually 
exclusive, "
+                        "tmp_rs_key={}, recycle_rs_key={}",
+                        hex(tmp_rs_key), hex(recycle_rs_key));
+                LOG(INFO) << "skip commit rowset because tmp rowset key and 
recycle rowset key are "
+                             "mutually exclusive, txn_id="
+                          << rowset_meta.txn_id() << ", tablet_id=" << 
tablet_id
+                          << ", rowset_id=" << rowset_id << ", tmp_rs_key=" << 
hex(tmp_rs_key)
+                          << ", recycle_rs_key=" << hex(recycle_rs_key) << ", 
msg=" << msg;
+                return;
+            }
+        }
         auto existed_rowset_meta = response->mutable_existed_rowset_meta();
         if (!existed_rowset_meta->ParseFromString(existed_commit_val)) {
             code = MetaServiceCode::PROTOBUF_PARSE_ERR;
@@ -2800,6 +2810,21 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
         msg = fmt::format("failed to check whether rowset exists, err={}", 
err);
         return;
     }
+    if (config::enable_recycle_delete_rowset_key_check) {
+        bool recycle_rs_key_exists = false;
+        if (!check_recycle_rowset_key(txn.get(), recycle_rs_key, rowset_meta,
+                                      &recycle_rs_key_exists, code, msg)) {
+            return;
+        }
+        if (!recycle_rs_key_exists) {
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            msg = fmt::format("recycle rowset key not found, key={}", 
hex(recycle_rs_key));
+            LOG(INFO) << "skip commit rowset because recycle rowset key does 
not exist, txn_id="
+                      << rowset_meta.txn_id() << ", tablet_id=" << tablet_id
+                      << ", rowset_id=" << rowset_id << ", recycle_rs_key=" << 
hex(recycle_rs_key);
+            return;
+        }
+    }
     // write schema kv if rowset_meta has schema
     if (config::write_schema_kv && rowset_meta.has_tablet_schema()) {
         if (!rowset_meta.has_index_id() && !is_versioned_read) {
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 6f18493ee2e..8b17a385659 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10977,9 +10977,7 @@ TEST(MetaServiceTest, StaleCommitRowset) {
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
 
     ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
-    ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") != 
std::string::npos)
-            << res.status().msg();
-    ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().code();
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
 
     commit_txn(meta_service.get(), db_id, txn_id, label);
     ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
@@ -10988,6 +10986,89 @@ TEST(MetaServiceTest, StaleCommitRowset) {
     ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().code();
 }
 
+TEST(MetaServiceTest, CommitRowsetCheckTmpAndRecycleKeyExclusion) {
+    auto meta_service = get_meta_service();
+
+    const bool old_enable_recycle_delete_rowset_key_check =
+            config::enable_recycle_delete_rowset_key_check;
+    config::enable_recycle_delete_rowset_key_check = true;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycle_delete_rowset_key_check = 
old_enable_recycle_delete_rowset_key_check;
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+
+    std::string instance_id = "commit_rowset_recycle_key_test_instance_id";
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    auto put_recycle_rowset = [&](const doris::RowsetMetaCloudPB& rowset) {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        RecycleRowsetPB recycle_rowset;
+        recycle_rowset.mutable_rowset_meta()->CopyFrom(rowset);
+        recycle_rowset.set_type(RecycleRowsetPB::PREPARE);
+        std::string recycle_rs_val;
+        ASSERT_TRUE(recycle_rowset.SerializeToString(&recycle_rs_val));
+        txn->put(recycle_rowset_key({instance_id, rowset.tablet_id(), 
rowset.rowset_id_v2()}),
+                 recycle_rs_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    };
+
+    {
+        int64_t table_id = 1;
+        int64_t partition_id = 1;
+        int64_t tablet_id = 1;
+        int64_t db_id = 100201;
+        std::string label = "test_commit_rowset_tmp_and_recycle_key_conflict";
+        create_tablet(meta_service.get(), table_id, 1, partition_id, 
tablet_id);
+
+        int64_t txn_id = 0;
+        ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, 
table_id, txn_id));
+        CreateRowsetResponse res;
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+        rowset.mutable_load_id()->set_hi(123);
+        rowset.mutable_load_id()->set_lo(456);
+        ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+        res.Clear();
+        ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+        res.Clear();
+
+        ASSERT_NO_FATAL_FAILURE(put_recycle_rowset(rowset));
+        ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().msg();
+        ASSERT_TRUE(res.status().msg().find("mutually exclusive") != 
std::string::npos)
+                << res.status().msg();
+    }
+
+    {
+        int64_t table_id = 2;
+        int64_t partition_id = 2;
+        int64_t tablet_id = 2;
+        int64_t db_id = 100202;
+        std::string label = "test_commit_rowset_without_tmp_or_recycle_key";
+        create_tablet(meta_service.get(), table_id, 1, partition_id, 
tablet_id);
+
+        int64_t txn_id = 0;
+        ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, 
table_id, txn_id));
+        CreateRowsetResponse res;
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+        rowset.mutable_load_id()->set_hi(789);
+        rowset.mutable_load_id()->set_lo(101112);
+
+        ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().msg();
+        ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") != 
std::string::npos)
+                << res.status().msg();
+    }
+}
+
 TEST(MetaServiceTest, AlterObjInfoTest) {
     auto meta_service = get_meta_service();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to