This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8b6044bb77d branch-4.1: [fix](cloud) Validate recycle rowset key state 
during commit rowset #63985 (#64287)
8b6044bb77d is described below

commit 8b6044bb77dcce64bcf64698dec1bd8f06919af4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 10 11:40:53 2026 +0800

    branch-4.1: [fix](cloud) Validate recycle rowset key state during commit 
rowset #63985 (#64287)
    
    Cherry-picked from #63985
    
    Co-authored-by: Yixuan Wang <[email protected]>
---
 cloud/src/meta-service/meta_service.cpp | 125 +++++++++++++++++++-------------
 cloud/test/meta_service_test.cpp        |  87 +++++++++++++++++++++-
 2 files changed, 159 insertions(+), 53 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index c6f423381a9..2fe48980f6f 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2614,73 +2614,45 @@ void 
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
     }
 }
 
-// Check recycle_rowset_key to ensure idempotency for commit_rowset operation.
-// The precondition for commit_rowset is that prepare_rowset has been 
successfully executed,
-// which creates the recycle_rowset_key. Therefore, we only need to check if 
the
-// recycle_rowset_key exists to determine if this is a duplicate request:
-// - If key not found: commit_rowset has already been executed and remove the 
key,
-//   this is a duplicate request and should be rejected.
-// - If key exists but marked as recycled: the rowset data has been recycled 
by recycler,
-//   this request should be rejected to prevent data inconsistency.
-// - If key exists and not marked: this is a valid commit_rowset request, 
proceed normally.
-// Note: We don't need to check txn/job status separately because 
prepare_rowset has already
-// validated them, and the existence of recycle_rowset_key is sufficient to 
guarantee idempotency.
-int check_idempotent_for_txn_or_job(Transaction* txn, const std::string& 
recycle_rs_key,
-                                    doris::RowsetMetaCloudPB& rowset_meta,
-                                    const std::string& instance_id, int64_t 
tablet_id,
-                                    const std::string& rowset_id, const 
std::string& tablet_job_id,
-                                    bool is_versioned_read, ResourceManager* 
resource_mgr,
-                                    MetaServiceCode& code, std::string& msg) {
-    if (!rowset_meta.has_delete_predicate() && 
config::enable_recycle_delete_rowset_key_check) {
+// Returns false only when reading or parsing the recycle rowset key fails, or 
when the rowset has
+// already been marked as recycled. recycle_rs_key_exists tells the caller 
whether the key exists,
+bool check_recycle_rowset_key(Transaction* txn, const std::string& 
recycle_rs_key,
+                              const doris::RowsetMetaCloudPB& rowset_meta,
+                              bool* recycle_rs_key_exists, MetaServiceCode& 
code,
+                              std::string& msg) {
+    *recycle_rs_key_exists = false;
+    if (!rowset_meta.has_delete_predicate()) {
         std::string recycle_rs_val;
         TxnErrorCode err = txn->get(recycle_rs_key, &recycle_rs_val);
         if (err == TxnErrorCode::TXN_OK) {
+            *recycle_rs_key_exists = true;
             RecycleRowsetPB recycle_rowset;
             if (!recycle_rowset.ParseFromString(recycle_rs_val)) {
                 code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                 msg = fmt::format("malformed recycle rowset value. key={}", 
hex(recycle_rs_key));
-                return 1;
+                return false;
             }
             auto rs_meta = recycle_rowset.rowset_meta();
             if (rs_meta.has_is_recycled() && rs_meta.is_recycled()) {
                 code = MetaServiceCode::INVALID_ARGUMENT;
                 msg = fmt::format("rowset has already been marked as recycled, 
key={}, rs_meta={}",
                                   hex(recycle_rs_key), 
rs_meta.ShortDebugString());
-                return 1;
+                return false;
             }
         } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            code = MetaServiceCode::INVALID_ARGUMENT;
-            msg = fmt::format("recycle rowset key not found, key={}", 
hex(recycle_rs_key));
-            return 1;
+            return true;
         } else {
             code = cast_as<ErrCategory::READ>(err);
             msg = fmt::format("failed to get recycle rowset, err={}, key={}", 
err,
                               hex(recycle_rs_key));
-            return -1;
-        }
-    } else if (!config::enable_recycle_delete_rowset_key_check) {
-        if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
-            if (!check_job_existed(txn, code, msg, instance_id, tablet_id, 
rowset_id, tablet_job_id,
-                                   is_versioned_read, resource_mgr)) {
-                return 1;
-            }
-        }
-
-        // Check if the commit rowset request is invalid.
-        // If the transaction has been finished, it means this commit rowset 
is a timeout retry request.
-        // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
-        // If the rowset had load id, it means it is a load request, otherwise 
it is a
-        // compaction/sc request.
-        if (config::enable_load_txn_status_check && rowset_meta.has_load_id() 
&&
-            !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn, 
instance_id,
-                                      rowset_meta.txn_id(), code, msg)) {
-            LOG(WARNING) << "commit rowset failed, txn_id=" << 
rowset_meta.txn_id()
-                         << ", tablet_id=" << tablet_id << ", rowset_id=" << 
rowset_id
-                         << ", rowset_state=" << rowset_meta.rowset_state() << 
", msg=" << msg;
-            return 1;
+            return false;
         }
+    } else {
+        // Old BE does not write recycle rowset keys for delete predicate 
rowsets during
+        // commit_rowset, so treat the key as existing for compatibility.
+        *recycle_rs_key_exists = true;
     }
-    return 0;
+    return true;
 }
 
 /**
@@ -2732,16 +2704,54 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
     bool is_versioned_read = is_version_read_enabled(instance_id);
     auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id, 
rowset_id});
 
-    if (check_idempotent_for_txn_or_job(txn.get(), recycle_rs_key, 
rowset_meta, instance_id,
-                                        tablet_id, rowset_id, 
request->tablet_job_id(),
-                                        is_versioned_read, 
resource_mgr_.get(), code, msg) != 0) {
-        return;
+    if (!config::enable_recycle_delete_rowset_key_check) {
+        std::string tablet_job_id = request->tablet_job_id();
+        if (config::enable_tablet_job_check && !tablet_job_id.empty()) {
+            if (!check_job_existed(txn.get(), code, msg, instance_id, 
tablet_id, rowset_id,
+                                   tablet_job_id, is_versioned_read, 
resource_mgr_.get())) {
+                return;
+            }
+        }
+
+        // Check if the commit rowset request is invalid.
+        // If the transaction has been finished, it means this commit rowset 
is a timeout retry request.
+        // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
+        // If the rowset had load id, it means it is a load request, otherwise 
it is a
+        // compaction/sc request.
+        if (config::enable_load_txn_status_check && rowset_meta.has_load_id() 
&&
+            !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, 
txn.get(), instance_id,
+                                      rowset_meta.txn_id(), code, msg)) {
+            LOG(WARNING) << "commit rowset failed, txn_id=" << 
rowset_meta.txn_id()
+                         << ", tablet_id=" << tablet_id << ", rowset_id=" << 
rowset_id
+                         << ", rowset_state=" << rowset_meta.rowset_state() << 
", msg=" << msg;
+            return;
+        }
     }
 
     // Check if commit key already exists.
     std::string existed_commit_val;
     err = txn->get(tmp_rs_key, &existed_commit_val);
     if (err == TxnErrorCode::TXN_OK) {
+        if (config::enable_recycle_delete_rowset_key_check) {
+            bool recycle_rs_key_exists = false;
+            if (!check_recycle_rowset_key(txn.get(), recycle_rs_key, 
rowset_meta,
+                                          &recycle_rs_key_exists, code, msg)) {
+                return;
+            }
+            if (recycle_rs_key_exists) {
+                code = MetaServiceCode::INVALID_ARGUMENT;
+                msg = fmt::format(
+                        "tmp rowset key and recycle rowset key are mutually 
exclusive, "
+                        "tmp_rs_key={}, recycle_rs_key={}",
+                        hex(tmp_rs_key), hex(recycle_rs_key));
+                LOG(INFO) << "skip commit rowset because tmp rowset key and 
recycle rowset key are "
+                             "mutually exclusive, txn_id="
+                          << rowset_meta.txn_id() << ", tablet_id=" << 
tablet_id
+                          << ", rowset_id=" << rowset_id << ", tmp_rs_key=" << 
hex(tmp_rs_key)
+                          << ", recycle_rs_key=" << hex(recycle_rs_key) << ", 
msg=" << msg;
+                return;
+            }
+        }
         auto existed_rowset_meta = response->mutable_existed_rowset_meta();
         if (!existed_rowset_meta->ParseFromString(existed_commit_val)) {
             code = MetaServiceCode::PROTOBUF_PARSE_ERR;
@@ -2800,6 +2810,21 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
         msg = fmt::format("failed to check whether rowset exists, err={}", 
err);
         return;
     }
+    if (config::enable_recycle_delete_rowset_key_check) {
+        bool recycle_rs_key_exists = false;
+        if (!check_recycle_rowset_key(txn.get(), recycle_rs_key, rowset_meta,
+                                      &recycle_rs_key_exists, code, msg)) {
+            return;
+        }
+        if (!recycle_rs_key_exists) {
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            msg = fmt::format("recycle rowset key not found, key={}", 
hex(recycle_rs_key));
+            LOG(INFO) << "skip commit rowset because recycle rowset key does 
not exist, txn_id="
+                      << rowset_meta.txn_id() << ", tablet_id=" << tablet_id
+                      << ", rowset_id=" << rowset_id << ", recycle_rs_key=" << 
hex(recycle_rs_key);
+            return;
+        }
+    }
     // write schema kv if rowset_meta has schema
     if (config::write_schema_kv && rowset_meta.has_tablet_schema()) {
         if (!rowset_meta.has_index_id() && !is_versioned_read) {
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index a158629efd1..afc76d01fc6 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10977,9 +10977,7 @@ TEST(MetaServiceTest, StaleCommitRowset) {
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
 
     ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
-    ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") != 
std::string::npos)
-            << res.status().msg();
-    ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().code();
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
 
     commit_txn(meta_service.get(), db_id, txn_id, label);
     ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
@@ -10988,6 +10986,89 @@ TEST(MetaServiceTest, StaleCommitRowset) {
     ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().code();
 }
 
+TEST(MetaServiceTest, CommitRowsetCheckTmpAndRecycleKeyExclusion) {
+    auto meta_service = get_meta_service();
+
+    const bool old_enable_recycle_delete_rowset_key_check =
+            config::enable_recycle_delete_rowset_key_check;
+    config::enable_recycle_delete_rowset_key_check = true;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycle_delete_rowset_key_check = 
old_enable_recycle_delete_rowset_key_check;
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+
+    std::string instance_id = "commit_rowset_recycle_key_test_instance_id";
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    auto put_recycle_rowset = [&](const doris::RowsetMetaCloudPB& rowset) {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        RecycleRowsetPB recycle_rowset;
+        recycle_rowset.mutable_rowset_meta()->CopyFrom(rowset);
+        recycle_rowset.set_type(RecycleRowsetPB::PREPARE);
+        std::string recycle_rs_val;
+        ASSERT_TRUE(recycle_rowset.SerializeToString(&recycle_rs_val));
+        txn->put(recycle_rowset_key({instance_id, rowset.tablet_id(), 
rowset.rowset_id_v2()}),
+                 recycle_rs_val);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    };
+
+    {
+        int64_t table_id = 1;
+        int64_t partition_id = 1;
+        int64_t tablet_id = 1;
+        int64_t db_id = 100201;
+        std::string label = "test_commit_rowset_tmp_and_recycle_key_conflict";
+        create_tablet(meta_service.get(), table_id, 1, partition_id, 
tablet_id);
+
+        int64_t txn_id = 0;
+        ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, 
table_id, txn_id));
+        CreateRowsetResponse res;
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+        rowset.mutable_load_id()->set_hi(123);
+        rowset.mutable_load_id()->set_lo(456);
+        ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+        res.Clear();
+        ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+        res.Clear();
+
+        ASSERT_NO_FATAL_FAILURE(put_recycle_rowset(rowset));
+        ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().msg();
+        ASSERT_TRUE(res.status().msg().find("mutually exclusive") != 
std::string::npos)
+                << res.status().msg();
+    }
+
+    {
+        int64_t table_id = 2;
+        int64_t partition_id = 2;
+        int64_t tablet_id = 2;
+        int64_t db_id = 100202;
+        std::string label = "test_commit_rowset_without_tmp_or_recycle_key";
+        create_tablet(meta_service.get(), table_id, 1, partition_id, 
tablet_id);
+
+        int64_t txn_id = 0;
+        ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, 
table_id, txn_id));
+        CreateRowsetResponse res;
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+        rowset.mutable_load_id()->set_hi(789);
+        rowset.mutable_load_id()->set_lo(101112);
+
+        ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, 
res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().msg();
+        ASSERT_TRUE(res.status().msg().find("recycle rowset key not found") != 
std::string::npos)
+                << res.status().msg();
+    }
+}
+
 TEST(MetaServiceTest, AlterObjInfoTest) {
     auto meta_service = get_meta_service();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to