dataroaring commented on code in PR #53467:
URL: https://github.com/apache/doris/pull/53467#discussion_r2373998942


##########
be/src/olap/version_graph.h:
##########
@@ -283,4 +290,202 @@ class TimestampedVersionTracker {
     VersionGraph _version_graph;
 };
 
+void VersionGraph::reconstruct_version_graph(RowsetMetaSPtrRange auto&& 
rs_metas,
+                                             int64_t* max_version) {
+    _version_graph.clear();
+    _vertex_index_map.clear();
+
+    construct_version_graph(rs_metas, max_version);
+}
+
+void VersionGraph::construct_version_graph(RowsetMetaSPtrRange auto&& rs_metas,
+                                           int64_t* max_version) {
+    if (std::ranges::empty(rs_metas)) {
+        VLOG_NOTICE << "there is no version in the header.";
+        return;
+    }
+
+    // Distill vertex values from versions in TabletMeta.
+    std::vector<int64_t> vertex_values;
+    vertex_values.reserve(2 * std::ranges::size(rs_metas));
+
+    for (const auto& rs : rs_metas) {
+        vertex_values.push_back(rs->start_version());
+        vertex_values.push_back(rs->end_version() + 1);
+        if (max_version != nullptr and *max_version < rs->end_version()) {
+            *max_version = rs->end_version();
+        }
+    }
+    std::sort(vertex_values.begin(), vertex_values.end());
+
+    // Items in `vertex_values` are sorted, but not unique.
+    // we choose unique items in `vertex_values` to create vertexes.
+    int64_t last_vertex_value = -1;
+    for (size_t i = 0; i < vertex_values.size(); ++i) {
+        if (i != 0 && vertex_values[i] == last_vertex_value) {
+            continue;
+        }
+
+        // Add vertex to graph.
+        _add_vertex_to_graph(vertex_values[i]);
+        last_vertex_value = vertex_values[i];
+    }
+    // Create edges for version graph according to TabletMeta's versions.
+    for (const auto& rs : rs_metas) {
+        // Versions in header are unique.
+        // We ensure `_vertex_index_map` has its `start_version`.
+        int64_t start_vertex_index = _vertex_index_map[rs->start_version()];
+        int64_t end_vertex_index = _vertex_index_map[rs->end_version() + 1];
+        // Add one edge from `start_version` to `end_version`.
+        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
+        // Add reverse edge from `end_version` to `start_version`.
+        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
+    }
+
+    // Sort edges by version in descending order.
+    for (auto& vertex : _version_graph) {
+        vertex.edges.sort([this](const int& vertex_idx_a, const int& 
vertex_idx_b) {
+            return _version_graph[vertex_idx_a].value > 
_version_graph[vertex_idx_b].value;
+        });
+    }
+}
+
+void 
TimestampedVersionTracker::_construct_versioned_tracker(RowsetMetaSPtrRange 
auto&& rs_metas) {
+    int64_t max_version = 0;
+
+    // construct the rowset graph
+    _version_graph.reconstruct_version_graph(rs_metas, &max_version);
+}
+
+void 
TimestampedVersionTracker::construct_versioned_tracker(RowsetMetaSPtrRange 
auto&& rs_metas) {
+    if (std::ranges::empty(rs_metas)) {
+        VLOG_NOTICE << "there is no version in the header.";
+        return;
+    }
+    _stale_version_path_map.clear();
+    _next_path_id = 1;
+    _construct_versioned_tracker(rs_metas);
+}
+
+void TimestampedVersionTracker::construct_versioned_tracker(
+        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& 
stale_metas) {
+    if (std::ranges::empty(rs_metas)) {
+        VLOG_NOTICE << "there is no version in the header.";
+        return;
+    }
+    _stale_version_path_map.clear();
+    _next_path_id = 1;
+    _construct_versioned_tracker(rs_metas);
+
+    // Init `_stale_version_path_map`.
+    _init_stale_version_path_map(rs_metas, stale_metas);
+}
+
+void TimestampedVersionTracker::_init_stale_version_path_map(
+        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& 
stale_metas) {
+    if (std::ranges::empty(stale_metas)) {
+        return;
+    }
+
+    // Sort stale meta by version diff (second version - first version).
+    std::list<RowsetMetaSharedPtr> sorted_stale_metas;
+    for (const auto& rs : stale_metas) {
+        sorted_stale_metas.emplace_back(rs);
+    }
+
+    // 1. sort the existing rowsets by version in ascending order.
+    sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // Compare by version diff between `version.first` and 
`version.second`.
+        int64_t a_diff = a->version().second - a->version().first;
+        int64_t b_diff = b->version().second - b->version().first;
+
+        int64_t diff = a_diff - b_diff;
+        if (diff < 0) {
+            return true;
+        } else if (diff > 0) {
+            return false;
+        }
+        // When the version diff is equal, compare the rowset`s stale time
+        return a->stale_at() < b->stale_at();
+    });
+
+    // first_version -> (second_version -> rowset_meta)
+    std::unordered_map<int64_t, std::unordered_map<int64_t, 
RowsetMetaSharedPtr>> stale_map;
+
+    // 2. generate stale path from stale_metas. traverse sorted_stale_metas 
and each time add stale_meta to stale_map.
+    // when a stale path in stale_map can replace stale_meta in 
sorted_stale_metas, stale_map remove rowset_metas of a stale path
+    // and add the path to `_stale_version_path_map`.
+    for (auto& stale_meta : sorted_stale_metas) {
+        std::vector<RowsetMetaSharedPtr> stale_path;
+        // 2.1 find a path in `stale_map` can replace current `stale_meta` 
version.
+        bool r = _find_path_from_stale_map(stale_map, 
stale_meta->start_version(),
+                                           stale_meta->end_version(), 
&stale_path);
+
+        // 2.2 add version to `version_graph`.
+        Version stale_meta_version = stale_meta->version();
+        add_version(stale_meta_version);
+
+        // 2.3 find the path.
+        if (r) {
+            // Add the path to `_stale_version_path_map`.
+            add_stale_path_version(stale_path);
+            // Remove `stale_path` from `stale_map`.
+            for (auto stale_item : stale_path) {
+                
stale_map[stale_item->start_version()].erase(stale_item->end_version());
+
+                if (stale_map[stale_item->start_version()].empty()) {
+                    stale_map.erase(stale_item->start_version());
+                }
+            }
+        }
+
+        // 2.4 add `stale_meta` to `stale_map`.
+        auto start_iter = stale_map.find(stale_meta->start_version());
+        if (start_iter != stale_map.end()) {
+            start_iter->second[stale_meta->end_version()] = stale_meta;
+        } else {
+            std::unordered_map<int64_t, RowsetMetaSharedPtr> item;
+            item[stale_meta->end_version()] = stale_meta;
+            stale_map[stale_meta->start_version()] = std::move(item);
+        }
+    }
+
+    // 3. generate stale path from `rs_metas`.
+    for (const auto& stale_meta : rs_metas) {
+        std::vector<RowsetMetaSharedPtr> stale_path;
+        // 3.1 find a path in stale_map can replace current `stale_meta` 
version.
+        bool r = _find_path_from_stale_map(stale_map, 
stale_meta->start_version(),
+                                           stale_meta->end_version(), 
&stale_path);
+
+        // 3.2 find the path.
+        if (r) {
+            // Add the path to `_stale_version_path_map`.
+            add_stale_path_version(stale_path);
+            // Remove `stale_path` from `stale_map`.
+            for (auto stale_item : stale_path) {
+                
stale_map[stale_item->start_version()].erase(stale_item->end_version());
+
+                if (stale_map[stale_item->start_version()].empty()) {
+                    stale_map.erase(stale_item->start_version());
+                }
+            }
+        }
+    }
+
+    // 4. process remain stale `rowset_meta` in `stale_map`.
+    auto map_iter = stale_map.begin();
+    while (map_iter != stale_map.end()) {
+        auto second_iter = map_iter->second.begin();
+        while (second_iter != map_iter->second.end()) {
+            // Each remain stale `rowset_meta` generate a stale path.
+            std::vector<RowsetMetaSharedPtr> stale_path;
+            stale_path.push_back(second_iter->second);
+            add_stale_path_version(stale_path);
+
+            second_iter++;
+        }
+        map_iter++;
+    }
+}

Review Comment:
   move functions to .cpp?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to