This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 887d9cac57f [feat](cloud) index_exists/partition_exists support
versioned read (#54597)
887d9cac57f is described below
commit 887d9cac57ff37d1302eecf73dad5727e07b1418
Author: walter <[email protected]>
AuthorDate: Tue Aug 12 20:30:33 2025 +0800
[feat](cloud) index_exists/partition_exists support versioned read (#54597)
---
cloud/src/meta-service/meta_service_partition.cpp | 17 ++-
cloud/src/meta-store/meta_reader.cpp | 59 ++++++++++
cloud/src/meta-store/meta_reader.h | 25 +++++
cloud/test/meta_reader_test.cpp | 79 +++++++++++++
cloud/test/meta_service_versioned_read_test.cpp | 129 ++++++++++++++++++++++
5 files changed, 307 insertions(+), 2 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_partition.cpp
b/cloud/src/meta-service/meta_service_partition.cpp
index b02ff532c4b..2aff744d2ff 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -26,6 +26,7 @@
#include "meta-service/meta_service_helper.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
#include "meta_service.h"
@@ -75,7 +76,13 @@ static TxnErrorCode index_exists(Transaction* txn, const
std::string& instance_i
}
return it->has_next() ? TxnErrorCode::TXN_OK :
TxnErrorCode::TXN_KEY_NOT_FOUND;
} else {
- CHECK(false) << "versioned read is not supported yet";
+ MetaReader reader(instance_id);
+ TxnErrorCode err = reader.get_index_index(txn, req->index_ids(0),
nullptr);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ LOG_WARNING("failed to get index index key").tag("err", err);
+ return err;
+ }
+ return err;
}
}
@@ -428,7 +435,13 @@ static TxnErrorCode partition_exists(Transaction* txn,
const std::string& instan
}
return it->has_next() ? TxnErrorCode::TXN_OK :
TxnErrorCode::TXN_KEY_NOT_FOUND;
} else {
- CHECK(false) << "versioned read is not supported yet";
+ MetaReader reader(instance_id);
+ TxnErrorCode err = reader.get_partition_index(txn,
req->partition_ids(0), nullptr);
+ if (err != TxnErrorCode::TXN_OK && err !=
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ LOG_WARNING("failed to get partition index key").tag("err", err);
+ return err;
+ }
+ return err;
}
}
diff --git a/cloud/src/meta-store/meta_reader.cpp
b/cloud/src/meta-store/meta_reader.cpp
index 4fcf7cadbb2..d1b32a23a30 100644
--- a/cloud/src/meta-store/meta_reader.cpp
+++ b/cloud/src/meta-store/meta_reader.cpp
@@ -560,4 +560,63 @@ TxnErrorCode
MetaReader::get_partition_pending_txn_id(Transaction* txn, int64_t
return TxnErrorCode::TXN_OK;
}
+TxnErrorCode MetaReader::get_index_index(int64_t index_id, IndexIndexPB*
index, bool snapshot) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ return get_index_index(txn.get(), index_id, index, snapshot);
+}
+
+TxnErrorCode MetaReader::get_index_index(Transaction* txn, int64_t index_id,
IndexIndexPB* index,
+ bool snapshot) {
+ std::string index_index_key = versioned::index_index_key({instance_id_,
index_id});
+ std::string value;
+ TxnErrorCode err = txn->get(index_index_key, &value, snapshot);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+
+ if (index && !index->ParseFromString(value)) {
+ LOG_ERROR("Failed to parse IndexIndexPB")
+ .tag("instance_id", instance_id_)
+ .tag("index_id", index_id)
+ .tag("key", hex(index_index_key));
+ return TxnErrorCode::TXN_INVALID_DATA;
+ }
+
+ return TxnErrorCode::TXN_OK;
+}
+
+TxnErrorCode MetaReader::get_partition_index(int64_t partition_id,
+ PartitionIndexPB*
partition_index, bool snapshot) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ return get_partition_index(txn.get(), partition_id, partition_index,
snapshot);
+}
+
+TxnErrorCode MetaReader::get_partition_index(Transaction* txn, int64_t
partition_id,
+ PartitionIndexPB*
partition_index, bool snapshot) {
+ std::string partition_index_key =
versioned::partition_index_key({instance_id_, partition_id});
+ std::string value;
+ TxnErrorCode err = txn->get(partition_index_key, &value, snapshot);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+
+ if (partition_index && !partition_index->ParseFromString(value)) {
+ LOG_ERROR("Failed to parse PartitionIndexPB")
+ .tag("instance_id", instance_id_)
+ .tag("partition_id", partition_id)
+ .tag("key", hex(partition_index_key));
+ return TxnErrorCode::TXN_INVALID_DATA;
+ }
+
+ return TxnErrorCode::TXN_OK;
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/meta-store/meta_reader.h
b/cloud/src/meta-store/meta_reader.h
index 76a2c61be2e..3ee2e549b76 100644
--- a/cloud/src/meta-store/meta_reader.h
+++ b/cloud/src/meta-store/meta_reader.h
@@ -35,6 +35,7 @@ namespace doris::cloud {
// throughout the lifetime of the MetaReader instance.
class MetaReader {
public:
+ MetaReader(std::string_view instance_id) : MetaReader(instance_id,
nullptr) {}
MetaReader(std::string_view instance_id, TxnKv* txn_kv)
: MetaReader(instance_id, txn_kv, Versionstamp::max(), false) {}
MetaReader(std::string_view instance_id, TxnKv* txn_kv, Versionstamp
snapshot_version)
@@ -256,6 +257,30 @@ public:
return get_partition_pending_txn_id(txn, partition_id, first_txn_id,
snapshot_);
}
+ // Get the index of the given index id.
+ TxnErrorCode get_index_index(int64_t index_id, IndexIndexPB* index, bool
snapshot);
+ TxnErrorCode get_index_index(Transaction* txn, int64_t index_id,
IndexIndexPB* index,
+ bool snapshot);
+ TxnErrorCode get_index_index(int64_t index_id, IndexIndexPB* index) {
+ return get_index_index(index_id, index, snapshot_);
+ }
+ TxnErrorCode get_index_index(Transaction* txn, int64_t index_id,
IndexIndexPB* index) {
+ return get_index_index(txn, index_id, index, snapshot_);
+ }
+
+ // Get the partition index for the given partition_id.
+ TxnErrorCode get_partition_index(int64_t partition_id, PartitionIndexPB*
partition_index,
+ bool snapshot);
+ TxnErrorCode get_partition_index(Transaction* txn, int64_t partition_id,
+ PartitionIndexPB* partition_index, bool
snapshot);
+ TxnErrorCode get_partition_index(int64_t partition_id, PartitionIndexPB*
partition_index) {
+ return get_partition_index(partition_id, partition_index, snapshot_);
+ }
+ TxnErrorCode get_partition_index(Transaction* txn, int64_t partition_id,
+ PartitionIndexPB* partition_index) {
+ return get_partition_index(txn, partition_id, partition_index,
snapshot_);
+ }
+
private:
const std::string_view instance_id_;
const Versionstamp snapshot_version_;
diff --git a/cloud/test/meta_reader_test.cpp b/cloud/test/meta_reader_test.cpp
index a8cfc7beb97..c30ae9a84bb 100644
--- a/cloud/test/meta_reader_test.cpp
+++ b/cloud/test/meta_reader_test.cpp
@@ -1221,3 +1221,82 @@ TEST(MetaReaderTest, GetPartitionPendingTxnId) {
ASSERT_EQ(first_txn_id, 2001);
}
}
+
+TEST(MetaReaderTest, GetIndexIndex) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ std::string instance_id = "test_instance";
+ int64_t index_id = 3001;
+
+ {
+ // NOT FOUND
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ IndexIndexPB index;
+ TxnErrorCode err = meta_reader.get_index_index(index_id, &index);
+ ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+ }
+
+ {
+ // Put an index index
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string index_index_key = versioned::index_index_key({instance_id,
index_id});
+ IndexIndexPB index_index;
+ index_index.set_db_id(1001);
+ index_index.set_table_id(2001);
+ txn->put(index_index_key, index_index.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ IndexIndexPB index;
+ TxnErrorCode err = meta_reader.get_index_index(index_id, &index);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ ASSERT_EQ(index.db_id(), 1001);
+ ASSERT_EQ(index.table_id(), 2001);
+ }
+}
+
+TEST(MetaReaderTest, GetPartitionIndex) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ std::string instance_id = "test_instance";
+ int64_t partition_id = 4001;
+
+ {
+ // NOT FOUND
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ PartitionIndexPB partition_index;
+ TxnErrorCode err = meta_reader.get_partition_index(partition_id,
&partition_index);
+ ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+ }
+
+ {
+ // Put a partition index
+ PartitionIndexPB partition_index_pb;
+ partition_index_pb.set_db_id(100);
+ partition_index_pb.set_table_id(200);
+
+ std::string partition_index_value;
+
ASSERT_TRUE(partition_index_pb.SerializeToString(&partition_index_value));
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string partition_index_key =
+ versioned::partition_index_key({instance_id, partition_id});
+ txn->put(partition_index_key, partition_index_value);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ PartitionIndexPB partition_index;
+ TxnErrorCode err = meta_reader.get_partition_index(partition_id,
&partition_index);
+ ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+ ASSERT_EQ(partition_index.db_id(), 100);
+ ASSERT_EQ(partition_index.table_id(), 200);
+ }
+}
diff --git a/cloud/test/meta_service_versioned_read_test.cpp
b/cloud/test/meta_service_versioned_read_test.cpp
index e2368245e7e..1638c7927e1 100644
--- a/cloud/test/meta_service_versioned_read_test.cpp
+++ b/cloud/test/meta_service_versioned_read_test.cpp
@@ -647,4 +647,133 @@ TEST(MetaServiceVersionedReadTest, UpdateTablet) {
get_and_check_tablet_meta(tablet_id1, 300, true, true);
}
+TEST(MetaServiceVersionedReadTest, IndexRequest) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "commit_index";
+
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ constexpr int64_t db_id = 123;
+ constexpr int64_t table_id = 10001;
+ constexpr int64_t index_id = 10002;
+
+ {
+ // Prepare index
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ meta_service->prepare_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Commit index
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.set_is_new_table(true);
+ meta_service->commit_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Prepare index again
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ meta_service->prepare_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED)
+ << res.status().DebugString();
+ }
+
+ {
+ // Commit index again
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.set_is_new_table(true);
+ meta_service->commit_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+}
+
+TEST(MetaServiceVersionedReadTest, PartitionRequest) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "PartitionRequest";
+
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ constexpr int64_t db_id = 1;
+ constexpr int64_t table_id = 10001;
+ constexpr int64_t index_id = 10002;
+ constexpr int64_t partition_id = 10003;
+
+ {
+ // Prepare transaction
+ brpc::Controller ctrl;
+ PartitionRequest req;
+ PartitionResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.add_partition_ids(partition_id);
+ meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Commit partition
+ brpc::Controller ctrl;
+ PartitionRequest req;
+ PartitionResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.add_partition_ids(partition_id);
+ meta_service->commit_partition(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Prepare partition again
+ brpc::Controller ctrl;
+ PartitionRequest req;
+ PartitionResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.add_partition_ids(partition_id);
+ meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED)
+ << res.status().DebugString();
+ }
+
+ {
+ // Commit partition again
+ brpc::Controller ctrl;
+ PartitionRequest req;
+ PartitionResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.add_partition_ids(partition_id);
+ meta_service->commit_partition(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]