w41ter commented on code in PR #56463:
URL: https://github.com/apache/doris/pull/56463#discussion_r2386541477


##########
cloud/src/meta-store/meta_reader.cpp:
##########
@@ -553,6 +553,124 @@ TxnErrorCode 
MetaReader::get_partition_versions(Transaction* txn,
     return TxnErrorCode::TXN_OK;
 }
 
+TxnErrorCode MetaReader::get_all_rowset_metas_at_versionstamp(
+        int64_t start_version, int64_t end_version, 
std::vector<RowsetMetaCloudPB>* rowset_metas,
+        bool snapshot) {
+    DCHECK(txn_kv_) << "TxnKv must be set before calling";
+    if (!txn_kv_) {
+        return TxnErrorCode::TXN_INVALID_ARGUMENT;
+    }
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        return err;
+    }
+    return get_all_rowset_metas_at_versionstamp(txn.get(), start_version, 
end_version, rowset_metas,
+                                                snapshot);
+}
+
+TxnErrorCode MetaReader::get_all_rowset_metas_at_versionstamp(
+        Transaction* txn, int64_t start_version, int64_t end_version,
+        std::vector<RowsetMetaCloudPB>* rowset_metas, bool snapshot) {
+    std::map<int64_t, std::map<int64_t, RowsetMetaCloudPB>> rowset_graph;
+
+    {
+        std::string start_key = versioned::meta_rowset_load_key({instance_id_, 
0, 0});
+        std::string end_key =
+                versioned::meta_rowset_load_key({instance_id_, 
std::numeric_limits<int64_t>::max(),
+                                                 
std::numeric_limits<int64_t>::max()});
+
+        versioned::ReadDocumentMessagesOptions options;
+        options.snapshot = snapshot;
+        options.snapshot_version = snapshot_version_;
+        options.exclude_begin_key = false;
+        options.exclude_end_key = false;
+
+        auto iter =
+                versioned::document_get_range<RowsetMetaCloudPB>(txn, 
start_key, end_key, options);
+        for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
+            auto&& [key, version, rowset_meta] = *kvp;
+
+            if (rowset_meta.start_version() >= start_version &&
+                rowset_meta.end_version() <= end_version) {
+                
rowset_graph[rowset_meta.tablet_id()].emplace(rowset_meta.end_version(),
+                                                              
std::move(rowset_meta));
+                min_read_versionstamp_ = std::min(min_read_versionstamp_, 
version);
+                DCHECK(version < snapshot_version_)
+                        << "version: " << version.to_string()
+                        << ", snapshot_version: " << 
snapshot_version_.to_string();
+            }
+        }
+        if (!iter->is_valid()) {
+            LOG_ERROR("failed to get loaded rowset metas at versionstamp")
+                    .tag("instance_id", instance_id_)
+                    .tag("start_version", start_version)
+                    .tag("end_version", end_version)
+                    .tag("error_code", iter->error_code());
+            return iter->error_code();
+        }
+    }
+
+    {
+        std::string start_key = 
versioned::meta_rowset_compact_key({instance_id_, 0, 0});
+        std::string end_key = versioned::meta_rowset_compact_key(
+                {instance_id_, std::numeric_limits<int64_t>::max(),
+                 std::numeric_limits<int64_t>::max()});
+
+        versioned::ReadDocumentMessagesOptions options;
+        options.snapshot = snapshot;
+        options.snapshot_version = snapshot_version_;
+        options.exclude_begin_key = false;
+        options.exclude_end_key = false;
+
+        int64_t last_start_version = std::numeric_limits<int64_t>::max();
+        auto iter =
+                versioned::document_get_range<RowsetMetaCloudPB>(txn, 
start_key, end_key, options);
+        for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
+            auto&& [key, version, rowset_meta] = *kvp;
+            DCHECK(version < snapshot_version_)
+                    << "version: " << version.to_string()
+                    << ", snapshot_version: " << snapshot_version_.to_string();
+
+            auto tablet_id = rowset_meta.tablet_id();
+
+            if (rowset_meta.start_version() >= start_version &&
+                rowset_meta.end_version() <= end_version) {
+                int64_t rowset_start_version = rowset_meta.start_version();
+                int64_t rowset_end_version = rowset_meta.end_version();
+                if (last_start_version <= rowset_start_version) {
+                    continue;
+                }
+
+                min_read_versionstamp_ = std::min(min_read_versionstamp_, 
version);
+                last_start_version = rowset_start_version;
+                rowset_graph[tablet_id].erase(
+                        
rowset_graph[tablet_id].lower_bound(rowset_start_version),
+                        
rowset_graph[tablet_id].upper_bound(rowset_end_version));
+                rowset_graph[tablet_id].emplace(rowset_end_version, 
std::move(rowset_meta));
+            }
+        }
+        if (!iter->is_valid()) {
+            LOG_ERROR("failed to get compacted rowset metas at versionstamp")
+                    .tag("instance_id", instance_id_)
+                    .tag("start_version", start_version)
+                    .tag("end_version", end_version)
+                    .tag("error_code", iter->error_code());
+            return iter->error_code();
+        }
+    }
+
+    rowset_metas->clear();
+    rowset_metas->reserve(rowset_graph.size());

Review Comment:
   ```suggestion
   ```
   
   It is meaningless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to