This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 887d9cac57f [feat](cloud) index_exists/partition_exists support 
versioned read (#54597)
887d9cac57f is described below

commit 887d9cac57ff37d1302eecf73dad5727e07b1418
Author: walter <[email protected]>
AuthorDate: Tue Aug 12 20:30:33 2025 +0800

    [feat](cloud) index_exists/partition_exists support versioned read (#54597)
---
 cloud/src/meta-service/meta_service_partition.cpp |  17 ++-
 cloud/src/meta-store/meta_reader.cpp              |  59 ++++++++++
 cloud/src/meta-store/meta_reader.h                |  25 +++++
 cloud/test/meta_reader_test.cpp                   |  79 +++++++++++++
 cloud/test/meta_service_versioned_read_test.cpp   | 129 ++++++++++++++++++++++
 5 files changed, 307 insertions(+), 2 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_partition.cpp 
b/cloud/src/meta-service/meta_service_partition.cpp
index b02ff532c4b..2aff744d2ff 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -26,6 +26,7 @@
 #include "meta-service/meta_service_helper.h"
 #include "meta-store/document_message.h"
 #include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
 #include "meta-store/txn_kv_error.h"
 #include "meta-store/versioned_value.h"
 #include "meta_service.h"
@@ -75,7 +76,13 @@ static TxnErrorCode index_exists(Transaction* txn, const 
std::string& instance_i
         }
         return it->has_next() ? TxnErrorCode::TXN_OK : 
TxnErrorCode::TXN_KEY_NOT_FOUND;
     } else {
-        CHECK(false) << "versioned read is not supported yet";
+        MetaReader reader(instance_id);
+        TxnErrorCode err = reader.get_index_index(txn, req->index_ids(0), 
nullptr);
+        if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            LOG_WARNING("failed to get index index key").tag("err", err);
+            return err;
+        }
+        return err;
     }
 }
 
@@ -428,7 +435,13 @@ static TxnErrorCode partition_exists(Transaction* txn, 
const std::string& instan
         }
         return it->has_next() ? TxnErrorCode::TXN_OK : 
TxnErrorCode::TXN_KEY_NOT_FOUND;
     } else {
-        CHECK(false) << "versioned read is not supported yet";
+        MetaReader reader(instance_id);
+        TxnErrorCode err = reader.get_partition_index(txn, 
req->partition_ids(0), nullptr);
+        if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            LOG_WARNING("failed to get partition index key").tag("err", err);
+            return err;
+        }
+        return err;
     }
 }
 
diff --git a/cloud/src/meta-store/meta_reader.cpp 
b/cloud/src/meta-store/meta_reader.cpp
index 4fcf7cadbb2..d1b32a23a30 100644
--- a/cloud/src/meta-store/meta_reader.cpp
+++ b/cloud/src/meta-store/meta_reader.cpp
@@ -560,4 +560,63 @@ TxnErrorCode 
MetaReader::get_partition_pending_txn_id(Transaction* txn, int64_t
     return TxnErrorCode::TXN_OK;
 }
 
+TxnErrorCode MetaReader::get_index_index(int64_t index_id, IndexIndexPB* 
index, bool snapshot) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        return err;
+    }
+    return get_index_index(txn.get(), index_id, index, snapshot);
+}
+
+TxnErrorCode MetaReader::get_index_index(Transaction* txn, int64_t index_id, 
IndexIndexPB* index,
+                                         bool snapshot) {
+    std::string index_index_key = versioned::index_index_key({instance_id_, 
index_id});
+    std::string value;
+    TxnErrorCode err = txn->get(index_index_key, &value, snapshot);
+    if (err != TxnErrorCode::TXN_OK) {
+        return err;
+    }
+
+    if (index && !index->ParseFromString(value)) {
+        LOG_ERROR("Failed to parse IndexIndexPB")
+                .tag("instance_id", instance_id_)
+                .tag("index_id", index_id)
+                .tag("key", hex(index_index_key));
+        return TxnErrorCode::TXN_INVALID_DATA;
+    }
+
+    return TxnErrorCode::TXN_OK;
+}
+
+TxnErrorCode MetaReader::get_partition_index(int64_t partition_id,
+                                             PartitionIndexPB* 
partition_index, bool snapshot) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        return err;
+    }
+    return get_partition_index(txn.get(), partition_id, partition_index, 
snapshot);
+}
+
+TxnErrorCode MetaReader::get_partition_index(Transaction* txn, int64_t 
partition_id,
+                                             PartitionIndexPB* 
partition_index, bool snapshot) {
+    std::string partition_index_key = 
versioned::partition_index_key({instance_id_, partition_id});
+    std::string value;
+    TxnErrorCode err = txn->get(partition_index_key, &value, snapshot);
+    if (err != TxnErrorCode::TXN_OK) {
+        return err;
+    }
+
+    if (partition_index && !partition_index->ParseFromString(value)) {
+        LOG_ERROR("Failed to parse PartitionIndexPB")
+                .tag("instance_id", instance_id_)
+                .tag("partition_id", partition_id)
+                .tag("key", hex(partition_index_key));
+        return TxnErrorCode::TXN_INVALID_DATA;
+    }
+
+    return TxnErrorCode::TXN_OK;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/meta-store/meta_reader.h 
b/cloud/src/meta-store/meta_reader.h
index 76a2c61be2e..3ee2e549b76 100644
--- a/cloud/src/meta-store/meta_reader.h
+++ b/cloud/src/meta-store/meta_reader.h
@@ -35,6 +35,7 @@ namespace doris::cloud {
 // throughout the lifetime of the MetaReader instance.
 class MetaReader {
 public:
+    MetaReader(std::string_view instance_id) : MetaReader(instance_id, 
nullptr) {}
     MetaReader(std::string_view instance_id, TxnKv* txn_kv)
             : MetaReader(instance_id, txn_kv, Versionstamp::max(), false) {}
     MetaReader(std::string_view instance_id, TxnKv* txn_kv, Versionstamp 
snapshot_version)
@@ -256,6 +257,30 @@ public:
         return get_partition_pending_txn_id(txn, partition_id, first_txn_id, 
snapshot_);
     }
 
+    // Get the index of the given index id.
+    TxnErrorCode get_index_index(int64_t index_id, IndexIndexPB* index, bool 
snapshot);
+    TxnErrorCode get_index_index(Transaction* txn, int64_t index_id, 
IndexIndexPB* index,
+                                 bool snapshot);
+    TxnErrorCode get_index_index(int64_t index_id, IndexIndexPB* index) {
+        return get_index_index(index_id, index, snapshot_);
+    }
+    TxnErrorCode get_index_index(Transaction* txn, int64_t index_id, 
IndexIndexPB* index) {
+        return get_index_index(txn, index_id, index, snapshot_);
+    }
+
+    // Get the partition index for the given partition_id.
+    TxnErrorCode get_partition_index(int64_t partition_id, PartitionIndexPB* 
partition_index,
+                                     bool snapshot);
+    TxnErrorCode get_partition_index(Transaction* txn, int64_t partition_id,
+                                     PartitionIndexPB* partition_index, bool 
snapshot);
+    TxnErrorCode get_partition_index(int64_t partition_id, PartitionIndexPB* 
partition_index) {
+        return get_partition_index(partition_id, partition_index, snapshot_);
+    }
+    TxnErrorCode get_partition_index(Transaction* txn, int64_t partition_id,
+                                     PartitionIndexPB* partition_index) {
+        return get_partition_index(txn, partition_id, partition_index, 
snapshot_);
+    }
+
 private:
     const std::string_view instance_id_;
     const Versionstamp snapshot_version_;
diff --git a/cloud/test/meta_reader_test.cpp b/cloud/test/meta_reader_test.cpp
index a8cfc7beb97..c30ae9a84bb 100644
--- a/cloud/test/meta_reader_test.cpp
+++ b/cloud/test/meta_reader_test.cpp
@@ -1221,3 +1221,82 @@ TEST(MetaReaderTest, GetPartitionPendingTxnId) {
         ASSERT_EQ(first_txn_id, 2001);
     }
 }
+
+TEST(MetaReaderTest, GetIndexIndex) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    std::string instance_id = "test_instance";
+    int64_t index_id = 3001;
+
+    {
+        // NOT FOUND
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        IndexIndexPB index;
+        TxnErrorCode err = meta_reader.get_index_index(index_id, &index);
+        ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+    }
+
+    {
+        // Put an index index
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string index_index_key = versioned::index_index_key({instance_id, 
index_id});
+        IndexIndexPB index_index;
+        index_index.set_db_id(1001);
+        index_index.set_table_id(2001);
+        txn->put(index_index_key, index_index.SerializeAsString());
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        IndexIndexPB index;
+        TxnErrorCode err = meta_reader.get_index_index(index_id, &index);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        ASSERT_EQ(index.db_id(), 1001);
+        ASSERT_EQ(index.table_id(), 2001);
+    }
+}
+
+TEST(MetaReaderTest, GetPartitionIndex) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    std::string instance_id = "test_instance";
+    int64_t partition_id = 4001;
+
+    {
+        // NOT FOUND
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        PartitionIndexPB partition_index;
+        TxnErrorCode err = meta_reader.get_partition_index(partition_id, 
&partition_index);
+        ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+    }
+
+    {
+        // Put a partition index
+        PartitionIndexPB partition_index_pb;
+        partition_index_pb.set_db_id(100);
+        partition_index_pb.set_table_id(200);
+
+        std::string partition_index_value;
+        
ASSERT_TRUE(partition_index_pb.SerializeToString(&partition_index_value));
+
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string partition_index_key =
+                versioned::partition_index_key({instance_id, partition_id});
+        txn->put(partition_index_key, partition_index_value);
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    {
+        MetaReader meta_reader(instance_id, txn_kv.get());
+        PartitionIndexPB partition_index;
+        TxnErrorCode err = meta_reader.get_partition_index(partition_id, 
&partition_index);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        ASSERT_EQ(partition_index.db_id(), 100);
+        ASSERT_EQ(partition_index.table_id(), 200);
+    }
+}
diff --git a/cloud/test/meta_service_versioned_read_test.cpp 
b/cloud/test/meta_service_versioned_read_test.cpp
index e2368245e7e..1638c7927e1 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -647,4 +647,133 @@ TEST(MetaServiceVersionedReadTest, UpdateTablet) {
     get_and_check_tablet_meta(tablet_id1, 300, true, true);
 }
 
+TEST(MetaServiceVersionedReadTest, IndexRequest) {
+    auto meta_service = get_meta_service(false);
+    std::string instance_id = "commit_index";
+
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    constexpr int64_t db_id = 123;
+    constexpr int64_t table_id = 10001;
+    constexpr int64_t index_id = 10002;
+
+    {
+        // Prepare index
+        brpc::Controller ctrl;
+        IndexRequest req;
+        IndexResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        meta_service->prepare_index(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+    }
+
+    {
+        // Commit index
+        brpc::Controller ctrl;
+        IndexRequest req;
+        IndexResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        req.set_is_new_table(true);
+        meta_service->commit_index(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+    }
+
+    {
+        // Prepare index again
+        brpc::Controller ctrl;
+        IndexRequest req;
+        IndexResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        meta_service->prepare_index(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED)
+                << res.status().DebugString();
+    }
+
+    {
+        // Commit index again
+        brpc::Controller ctrl;
+        IndexRequest req;
+        IndexResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        req.set_is_new_table(true);
+        meta_service->commit_index(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+    }
+}
+
+TEST(MetaServiceVersionedReadTest, PartitionRequest) {
+    auto meta_service = get_meta_service(false);
+    std::string instance_id = "PartitionRequest";
+
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    constexpr int64_t db_id = 1;
+    constexpr int64_t table_id = 10001;
+    constexpr int64_t index_id = 10002;
+    constexpr int64_t partition_id = 10003;
+
+    {
+        // Prepare transaction
+        brpc::Controller ctrl;
+        PartitionRequest req;
+        PartitionResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        req.add_partition_ids(partition_id);
+        meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+    }
+
+    {
+        // Commit partition
+        brpc::Controller ctrl;
+        PartitionRequest req;
+        PartitionResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        req.add_partition_ids(partition_id);
+        meta_service->commit_partition(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+    }
+
+    {
+        // Prepare partition again
+        brpc::Controller ctrl;
+        PartitionRequest req;
+        PartitionResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        req.add_partition_ids(partition_id);
+        meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED)
+                << res.status().DebugString();
+    }
+
+    {
+        // Commit partition again
+        brpc::Controller ctrl;
+        PartitionRequest req;
+        PartitionResponse res;
+        req.set_db_id(db_id);
+        req.set_table_id(table_id);
+        req.add_index_ids(index_id);
+        req.add_partition_ids(partition_id);
+        meta_service->commit_partition(&ctrl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().DebugString();
+    }
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to