This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c0a98e  [BE] Optimize version retrieval efficiency. (#5831)
4c0a98e is described below

commit 4c0a98e8bf27926544d912c0a5b12bbbe12d339a
Author: 曹建华 <[email protected]>
AuthorDate: Wed Jun 2 09:58:21 2021 +0800

    [BE] Optimize version retrieval efficiency. (#5831)
    
    * [FE] Optimize version retrieval efficiency in high-frequency 
import/compaction scenarios.
    
    * Jump out of the loop when encountering the reverse edge.
---
 be/src/olap/version_graph.cpp | 136 +++++++++++++++++++-----------------------
 be/src/olap/version_graph.h   |   1 +
 2 files changed, 62 insertions(+), 75 deletions(-)

diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp
index 79380f6..ab9c901 100644
--- a/be/src/olap/version_graph.cpp
+++ b/be/src/olap/version_graph.cpp
@@ -452,6 +452,13 @@ void VersionGraph::construct_version_graph(const 
std::vector<RowsetMetaSharedPtr
         // Add reverse edge from end_version to start_version.
         _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
     }
+
+    // Sort edges by version in descending order.
+    for (auto& vertex : _version_graph) {
+        vertex.edges.sort([this](const int& vertex_idx_a, const int& 
vertex_idx_b) {
+            return _version_graph[vertex_idx_a].value > 
_version_graph[vertex_idx_b].value;
+        });
+    }
 }
 
 void VersionGraph::reconstruct_version_graph(const 
std::vector<RowsetMetaSharedPtr>& rs_metas,
@@ -476,10 +483,26 @@ void VersionGraph::add_version_to_graph(const Version& 
version) {
 
     // We assume this version is new version, so we just add two edges
     // into version graph. add one edge from start_version to end_version
-    _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
+    // Make sure the vertex's edges are sorted by version in descending order 
when inserting.
+    auto end_vertex_it = _version_graph[start_vertex_index].edges.begin();
+    while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) {
+        if (_version_graph[*end_vertex_it].value < 
_version_graph[end_vertex_index].value) {
+            break;
+        }
+        end_vertex_it++;
+    }
+    _version_graph[start_vertex_index].edges.insert(end_vertex_it, 
end_vertex_index);
 
     // We add reverse edge(from end_version to start_version) to graph
-    _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
+    // Make sure the vertex's edges are sorted by version in descending order 
when inserting.
+    auto start_vertex_it = _version_graph[end_vertex_index].edges.begin();
+    while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) {
+        if (_version_graph[*start_vertex_it].value < 
_version_graph[start_vertex_index].value) {
+            break;
+        }
+        start_vertex_it++;
+    }
+    _version_graph[end_vertex_index].edges.insert(start_vertex_it, 
start_vertex_index);
 }
 
 OLAPStatus VersionGraph::delete_version_from_graph(const Version& version) {
@@ -537,95 +560,58 @@ OLAPStatus 
VersionGraph::capture_consistent_versions(const Version& spec_version
         return OLAP_ERR_INPUT_PARAMETER_ERROR;
     }
 
-    // bfs_queue's element is vertex_index.
-    std::queue<int64_t> bfs_queue;
-    // predecessor[i] means the predecessor of vertex_index 'i'.
-    std::vector<int64_t> predecessor(_version_graph.size());
-    // visited[int64_t]==true means it had entered bfs_queue.
-    std::vector<bool> visited(_version_graph.size());
-    // [start_vertex_value, end_vertex_value)
-    int64_t start_vertex_value = spec_version.first;
-    int64_t end_vertex_value = spec_version.second + 1;
-    // -1 is invalid vertex index.
-    int64_t start_vertex_index = -1;
-    // -1 is valid vertex index.
-    int64_t end_vertex_index = -1;
-
-    for (size_t i = 0; i < _version_graph.size(); ++i) {
-        if (_version_graph[i].value == start_vertex_value) {
-            start_vertex_index = i;
-        }
-        if (_version_graph[i].value == end_vertex_value) {
-            end_vertex_index = i;
+    int64_t cur_idx = -1;
+    for (size_t i = 0; i < _version_graph.size(); i++) {
+        if (_version_graph[i].value == spec_version.first) {
+            cur_idx = i;
+            break;
         }
     }
 
-    if (start_vertex_index < 0 || end_vertex_index < 0) {
-        LOG(WARNING) << "fail to find path in version_graph. "
+    if (cur_idx < 0) {
+        LOG(WARNING) << "failed to find path in version_graph. "
                      << "spec_version: " << spec_version.first << "-" << 
spec_version.second;
         return OLAP_ERR_VERSION_NOT_EXIST;
     }
 
-    for (size_t i = 0; i < _version_graph.size(); ++i) {
-        visited[i] = false;
-    }
-
-    bfs_queue.push(start_vertex_index);
-    visited[start_vertex_index] = true;
-    // The predecessor of root is itself.
-    predecessor[start_vertex_index] = start_vertex_index;
-
-    while (bfs_queue.empty() == false && visited[end_vertex_index] == false) {
-        int64_t top_vertex_index = bfs_queue.front();
-        bfs_queue.pop();
-        for (const auto& it : _version_graph[top_vertex_index].edges) {
-            if (visited[it] == false) {
-                // If we don't support reverse version in the path, and start 
vertex
-                // value is larger than the end vertex value, we skip this 
edge.
-                if (_version_graph[top_vertex_index].value > 
_version_graph[it].value) {
-                    continue;
-                }
-                visited[it] = true;
-                predecessor[it] = top_vertex_index;
-                bfs_queue.push(it);
+    int64_t end_value = spec_version.second + 1;
+    while (_version_graph[cur_idx].value < end_value) {
+        int64_t next_idx = -1;
+        for (const auto& it : _version_graph[cur_idx].edges) {
+            // Only consider incremental versions
+            if (_version_graph[it].value < _version_graph[cur_idx].value) {
+                break;
             }
-        }
-    }
 
-    if (!visited[end_vertex_index]) {
-        LOG(WARNING) << "fail to find path in version_graph. "
-                     << "spec_version: " << spec_version.first << "-" << 
spec_version.second;
-        return OLAP_ERR_VERSION_NOT_EXIST;
-    }
+            if (_version_graph[it].value > end_value) {
+                continue;
+            }
 
-    std::vector<int64_t> reversed_path;
-    int64_t tmp_vertex_index = end_vertex_index;
-    reversed_path.push_back(tmp_vertex_index);
+            // Considering edges had been sorted by version in descending 
order,
+            // This version is the largest version that smaller than 
end_version.
+            next_idx = it;
+            break;
+        }
 
-    // For start_vertex_index, its predecessor must be itself.
-    while (predecessor[tmp_vertex_index] != tmp_vertex_index) {
-        tmp_vertex_index = predecessor[tmp_vertex_index];
-        reversed_path.push_back(tmp_vertex_index);
+        if (next_idx > -1) {
+            if (version_path != nullptr) {
+                version_path->emplace_back(_version_graph[cur_idx].value, 
_version_graph[next_idx].value - 1);
+            }
+            cur_idx = next_idx;
+        } else {
+            LOG(WARNING) << "fail to find path in version_graph. "
+                         << "spec_version: " << spec_version.first << "-" << 
spec_version.second;
+            return OLAP_ERR_VERSION_NOT_EXIST;
+        }
     }
 
-    if (version_path != nullptr) {
-        // Make version_path from reversed_path.
+    if (VLOG_TRACE_IS_ON && version_path != nullptr) {
         std::stringstream shortest_path_for_debug;
-        for (size_t path_id = reversed_path.size() - 1; path_id > 0; 
--path_id) {
-            int64_t tmp_start_vertex_value = 
_version_graph[reversed_path[path_id]].value;
-            int64_t tmp_end_vertex_value = 
_version_graph[reversed_path[path_id - 1]].value;
-
-            // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
-            if (tmp_start_vertex_value <= tmp_end_vertex_value) {
-                version_path->emplace_back(tmp_start_vertex_value, 
tmp_end_vertex_value - 1);
-            } else {
-                version_path->emplace_back(tmp_end_vertex_value, 
tmp_start_vertex_value - 1);
-            }
-
-            shortest_path_for_debug << (*version_path)[version_path->size() - 
1] << ' ';
+        for (const auto& version : *version_path) {
+            shortest_path_for_debug << version << ' ';
         }
         VLOG_TRACE << "success to find path for spec_version. spec_version=" 
<< spec_version
-                 << ", path=" << shortest_path_for_debug.str();
+                   << ", path=" << shortest_path_for_debug.str();
     }
 
     return OLAP_SUCCESS;
diff --git a/be/src/olap/version_graph.h b/be/src/olap/version_graph.h
index d5ef70e..4b8c42e 100644
--- a/be/src/olap/version_graph.h
+++ b/be/src/olap/version_graph.h
@@ -60,6 +60,7 @@ private:
     // vertex's value is version.start_version, the other is
     // version.end_version + 1.
     // Use adjacency list to describe version graph.
+    // In order to speed up the version capture, vertex's edges are sorted by 
version in descending order.
     std::vector<Vertex> _version_graph;
 
     // vertex value --> vertex_index of _version_graph

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to