This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 271b5670c8e branch-4.0: [chore](cloud) Add skeleton func for clone
chain reader #59469 (#60278)
271b5670c8e is described below
commit 271b5670c8e33816551b9be8cc8c7007d4bf917a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 30 16:33:19 2026 +0800
branch-4.0: [chore](cloud) Add skeleton func for clone chain reader #59469
(#60278)
Cherry-picked from #59469
Co-authored-by: Yixuan Wang <[email protected]>
---
cloud/src/meta-store/clone_chain_reader.cpp | 129 ++++++++++++++++++++++++++++
cloud/src/meta-store/clone_chain_reader.h | 20 +++++
cloud/src/recycler/meta_checker.cpp | 4 +-
3 files changed, 151 insertions(+), 2 deletions(-)
diff --git a/cloud/src/meta-store/clone_chain_reader.cpp
b/cloud/src/meta-store/clone_chain_reader.cpp
index 06ed82847b0..86113c00a40 100644
--- a/cloud/src/meta-store/clone_chain_reader.cpp
+++ b/cloud/src/meta-store/clone_chain_reader.cpp
@@ -971,6 +971,70 @@ TxnErrorCode
CloneChainReader::get_load_rowset_meta(Transaction* txn, int64_t ta
} while (true);
}
+TxnErrorCode CloneChainReader::get_load_rowset_metas(
+ int64_t tablet_id, std::vector<std::pair<RowsetMetaCloudPB,
Versionstamp>>* rowset_metas,
+ bool snapshot) {
+ DCHECK(txn_kv_) << "TxnKv must be set before calling";
+ if (!txn_kv_) {
+ return TxnErrorCode::TXN_INVALID_ARGUMENT;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ return get_load_rowset_metas(txn.get(), tablet_id, rowset_metas, snapshot);
+}
+
+TxnErrorCode CloneChainReader::get_load_rowset_metas(
+ Transaction* txn, int64_t tablet_id,
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>* rowset_metas,
bool snapshot) {
+ std::string current_instance_id(instance_id_);
+ Versionstamp current_snapshot_version = snapshot_version_;
+ std::map<int64_t, std::pair<RowsetMetaCloudPB, Versionstamp>>
version_to_rowset;
+
+ do {
+ MetaReader reader(current_instance_id, current_snapshot_version);
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>
current_rowsets;
+ TxnErrorCode err = reader.get_load_rowset_metas(txn, tablet_id,
¤t_rowsets, snapshot);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ min_read_versionstamp_ = std::min(reader.min_read_versionstamp(),
min_read_versionstamp_);
+
+ // Add found rowsets to version_to_rowset map (only if not already
found)
+ for (auto&& rowset_pair : current_rowsets) {
+ int64_t version = rowset_pair.first.end_version();
+ if (!version_to_rowset.contains(version)) {
+ if (!rowset_pair.first.has_reference_instance_id()) {
+
rowset_pair.first.set_reference_instance_id(current_instance_id);
+ }
+ version_to_rowset[version] = std::move(rowset_pair);
+ }
+ }
+
+ // Try to find in previous clone chain
+ std::string prev_instance_id;
+ Versionstamp prev_snapshot_version;
+ if (!get_source_snapshot_info(current_instance_id, &prev_instance_id,
+ &prev_snapshot_version)) {
+ // no previous clone chain
+ break;
+ }
+ current_instance_id = std::move(prev_instance_id);
+ current_snapshot_version = prev_snapshot_version;
+ } while (true);
+
+ rowset_metas->clear();
+ rowset_metas->reserve(version_to_rowset.size());
+ for (auto&& [version, rowset_pair] : version_to_rowset) {
+ rowset_metas->push_back(std::move(rowset_pair));
+ }
+
+ return TxnErrorCode::TXN_OK;
+}
+
TxnErrorCode CloneChainReader::get_compact_rowset_meta(int64_t tablet_id,
int64_t version,
RowsetMetaCloudPB*
rowset_meta,
Versionstamp*
versionstamp, bool snapshot) {
@@ -1019,6 +1083,71 @@ TxnErrorCode
CloneChainReader::get_compact_rowset_meta(Transaction* txn, int64_t
} while (true);
}
+TxnErrorCode CloneChainReader::get_compact_rowset_metas(
+ int64_t tablet_id, std::vector<std::pair<RowsetMetaCloudPB,
Versionstamp>>* rowset_metas,
+ bool snapshot) {
+ DCHECK(txn_kv_) << "TxnKv must be set before calling";
+ if (!txn_kv_) {
+ return TxnErrorCode::TXN_INVALID_ARGUMENT;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ return get_compact_rowset_metas(txn.get(), tablet_id, rowset_metas,
snapshot);
+}
+
+TxnErrorCode CloneChainReader::get_compact_rowset_metas(
+ Transaction* txn, int64_t tablet_id,
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>* rowset_metas,
bool snapshot) {
+ std::string current_instance_id(instance_id_);
+ Versionstamp current_snapshot_version = snapshot_version_;
+ std::map<int64_t, std::pair<RowsetMetaCloudPB, Versionstamp>>
version_to_rowset;
+
+ do {
+ MetaReader reader(current_instance_id, current_snapshot_version);
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>
current_rowsets;
+ TxnErrorCode err =
+ reader.get_compact_rowset_metas(txn, tablet_id,
¤t_rowsets, snapshot);
+ if (err != TxnErrorCode::TXN_OK) {
+ return err;
+ }
+ min_read_versionstamp_ = std::min(reader.min_read_versionstamp(),
min_read_versionstamp_);
+
+ // Add found rowsets to version_to_rowset map (only if not already
found)
+ for (auto&& rowset_pair : current_rowsets) {
+ int64_t version = rowset_pair.first.end_version();
+ if (!version_to_rowset.contains(version)) {
+ if (!rowset_pair.first.has_reference_instance_id()) {
+
rowset_pair.first.set_reference_instance_id(current_instance_id);
+ }
+ version_to_rowset[version] = std::move(rowset_pair);
+ }
+ }
+
+ // Try to find in previous clone chain
+ std::string prev_instance_id;
+ Versionstamp prev_snapshot_version;
+ if (!get_source_snapshot_info(current_instance_id, &prev_instance_id,
+ &prev_snapshot_version)) {
+ // no previous clone chain
+ break;
+ }
+ current_instance_id = std::move(prev_instance_id);
+ current_snapshot_version = prev_snapshot_version;
+ } while (true);
+
+ rowset_metas->clear();
+ rowset_metas->reserve(version_to_rowset.size());
+ for (auto&& [version, rowset_pair] : version_to_rowset) {
+ rowset_metas->push_back(std::move(rowset_pair));
+ }
+
+ return TxnErrorCode::TXN_OK;
+}
+
TxnErrorCode CloneChainReader::get_partition_pending_txn_id(int64_t
partition_id,
int64_t*
first_txn_id,
int64_t*
partition_version,
diff --git a/cloud/src/meta-store/clone_chain_reader.h
b/cloud/src/meta-store/clone_chain_reader.h
index 493812805ed..651a106d0ab 100644
--- a/cloud/src/meta-store/clone_chain_reader.h
+++ b/cloud/src/meta-store/clone_chain_reader.h
@@ -200,6 +200,16 @@ public:
RowsetMetaCloudPB* rowset_meta,
Versionstamp* versionstamp,
bool snapshot = false);
+ // Get the load rowset metas for the given tablet_id.
+ TxnErrorCode get_load_rowset_metas(
+ int64_t tablet_id,
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>*
rowset_metas,
+ bool snapshot = false);
+ TxnErrorCode get_load_rowset_metas(
+ Transaction* txn, int64_t tablet_id,
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>*
rowset_metas,
+ bool snapshot = false);
+
// Get the compact rowset meta for the given tablet_id and version.
TxnErrorCode get_compact_rowset_meta(int64_t tablet_id, int64_t version,
RowsetMetaCloudPB* rowset_meta, bool
snapshot = false) {
@@ -219,6 +229,16 @@ public:
RowsetMetaCloudPB* rowset_meta,
Versionstamp* versionstamp,
bool snapshot = false);
+ // Get the compact rowset metas for the given tablet_id.
+ TxnErrorCode get_compact_rowset_metas(
+ int64_t tablet_id,
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>*
rowset_metas,
+ bool snapshot = false);
+ TxnErrorCode get_compact_rowset_metas(
+ Transaction* txn, int64_t tablet_id,
+ std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>>*
rowset_metas,
+ bool snapshot = false);
+
// Get the tablet meta keys.
TxnErrorCode get_tablet_meta(int64_t tablet_id, TabletMetaCloudPB*
tablet_meta,
Versionstamp* versionstamp, bool snapshot =
false);
diff --git a/cloud/src/recycler/meta_checker.cpp
b/cloud/src/recycler/meta_checker.cpp
index 3fbd8ab451d..7364669c89f 100644
--- a/cloud/src/recycler/meta_checker.cpp
+++ b/cloud/src/recycler/meta_checker.cpp
@@ -41,8 +41,8 @@
namespace doris::cloud {
-MetaChecker::MetaChecker(std::shared_ptr<TxnKv> txn_kv) :
txn_kv_(std::move(txn_kv)) {
- snapshot_manager_ = std::make_shared<SnapshotManager>(txn_kv_);
+MetaChecker::MetaChecker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(txn_kv) {
+ snapshot_manager_ = std::make_shared<SnapshotManager>(std::move(txn_kv));
}
bool MetaChecker::scan_and_handle_kv(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]