This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4c0a98e [BE] Optimize version retrieval efficiency. (#5831)
4c0a98e is described below
commit 4c0a98e8bf27926544d912c0a5b12bbbe12d339a
Author: 曹建华 <[email protected]>
AuthorDate: Wed Jun 2 09:58:21 2021 +0800
[BE] Optimize version retrieval efficiency. (#5831)
* [FE] Optimize version retrieval efficiency in high-frequency
import/compaction scenarios.
* Jump out of the loop when encountering the reverse edge.
---
be/src/olap/version_graph.cpp | 136 +++++++++++++++++++-----------------------
be/src/olap/version_graph.h | 1 +
2 files changed, 62 insertions(+), 75 deletions(-)
diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp
index 79380f6..ab9c901 100644
--- a/be/src/olap/version_graph.cpp
+++ b/be/src/olap/version_graph.cpp
@@ -452,6 +452,13 @@ void VersionGraph::construct_version_graph(const
std::vector<RowsetMetaSharedPtr
// Add reverse edge from end_version to start_version.
_version_graph[end_vertex_index].edges.push_front(start_vertex_index);
}
+
+ // Sort edges by version in descending order.
+ for (auto& vertex : _version_graph) {
+ vertex.edges.sort([this](const int& vertex_idx_a, const int&
vertex_idx_b) {
+ return _version_graph[vertex_idx_a].value >
_version_graph[vertex_idx_b].value;
+ });
+ }
}
void VersionGraph::reconstruct_version_graph(const
std::vector<RowsetMetaSharedPtr>& rs_metas,
@@ -476,10 +483,26 @@ void VersionGraph::add_version_to_graph(const Version&
version) {
// We assume this version is new version, so we just add two edges
// into version graph. add one edge from start_version to end_version
- _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
+ // Make sure the vertex's edges are sorted by version in descending order
when inserting.
+ auto end_vertex_it = _version_graph[start_vertex_index].edges.begin();
+ while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) {
+ if (_version_graph[*end_vertex_it].value <
_version_graph[end_vertex_index].value) {
+ break;
+ }
+ end_vertex_it++;
+ }
+ _version_graph[start_vertex_index].edges.insert(end_vertex_it,
end_vertex_index);
// We add reverse edge(from end_version to start_version) to graph
- _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
+ // Make sure the vertex's edges are sorted by version in descending order
when inserting.
+ auto start_vertex_it = _version_graph[end_vertex_index].edges.begin();
+ while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) {
+ if (_version_graph[*start_vertex_it].value <
_version_graph[start_vertex_index].value) {
+ break;
+ }
+ start_vertex_it++;
+ }
+ _version_graph[end_vertex_index].edges.insert(start_vertex_it,
start_vertex_index);
}
OLAPStatus VersionGraph::delete_version_from_graph(const Version& version) {
@@ -537,95 +560,58 @@ OLAPStatus
VersionGraph::capture_consistent_versions(const Version& spec_version
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
- // bfs_queue's element is vertex_index.
- std::queue<int64_t> bfs_queue;
- // predecessor[i] means the predecessor of vertex_index 'i'.
- std::vector<int64_t> predecessor(_version_graph.size());
- // visited[int64_t]==true means it had entered bfs_queue.
- std::vector<bool> visited(_version_graph.size());
- // [start_vertex_value, end_vertex_value)
- int64_t start_vertex_value = spec_version.first;
- int64_t end_vertex_value = spec_version.second + 1;
- // -1 is invalid vertex index.
- int64_t start_vertex_index = -1;
- // -1 is valid vertex index.
- int64_t end_vertex_index = -1;
-
- for (size_t i = 0; i < _version_graph.size(); ++i) {
- if (_version_graph[i].value == start_vertex_value) {
- start_vertex_index = i;
- }
- if (_version_graph[i].value == end_vertex_value) {
- end_vertex_index = i;
+ int64_t cur_idx = -1;
+ for (size_t i = 0; i < _version_graph.size(); i++) {
+ if (_version_graph[i].value == spec_version.first) {
+ cur_idx = i;
+ break;
}
}
- if (start_vertex_index < 0 || end_vertex_index < 0) {
- LOG(WARNING) << "fail to find path in version_graph. "
+ if (cur_idx < 0) {
+ LOG(WARNING) << "failed to find path in version_graph. "
<< "spec_version: " << spec_version.first << "-" <<
spec_version.second;
return OLAP_ERR_VERSION_NOT_EXIST;
}
- for (size_t i = 0; i < _version_graph.size(); ++i) {
- visited[i] = false;
- }
-
- bfs_queue.push(start_vertex_index);
- visited[start_vertex_index] = true;
- // The predecessor of root is itself.
- predecessor[start_vertex_index] = start_vertex_index;
-
- while (bfs_queue.empty() == false && visited[end_vertex_index] == false) {
- int64_t top_vertex_index = bfs_queue.front();
- bfs_queue.pop();
- for (const auto& it : _version_graph[top_vertex_index].edges) {
- if (visited[it] == false) {
- // If we don't support reverse version in the path, and start
vertex
- // value is larger than the end vertex value, we skip this
edge.
- if (_version_graph[top_vertex_index].value >
_version_graph[it].value) {
- continue;
- }
- visited[it] = true;
- predecessor[it] = top_vertex_index;
- bfs_queue.push(it);
+ int64_t end_value = spec_version.second + 1;
+ while (_version_graph[cur_idx].value < end_value) {
+ int64_t next_idx = -1;
+ for (const auto& it : _version_graph[cur_idx].edges) {
+ // Only consider incremental versions
+ if (_version_graph[it].value < _version_graph[cur_idx].value) {
+ break;
}
- }
- }
- if (!visited[end_vertex_index]) {
- LOG(WARNING) << "fail to find path in version_graph. "
- << "spec_version: " << spec_version.first << "-" <<
spec_version.second;
- return OLAP_ERR_VERSION_NOT_EXIST;
- }
+ if (_version_graph[it].value > end_value) {
+ continue;
+ }
- std::vector<int64_t> reversed_path;
- int64_t tmp_vertex_index = end_vertex_index;
- reversed_path.push_back(tmp_vertex_index);
+ // Considering edges had been sorted by version in descending
order,
+ // This version is the largest version that smaller than
end_version.
+ next_idx = it;
+ break;
+ }
- // For start_vertex_index, its predecessor must be itself.
- while (predecessor[tmp_vertex_index] != tmp_vertex_index) {
- tmp_vertex_index = predecessor[tmp_vertex_index];
- reversed_path.push_back(tmp_vertex_index);
+ if (next_idx > -1) {
+ if (version_path != nullptr) {
+ version_path->emplace_back(_version_graph[cur_idx].value,
_version_graph[next_idx].value - 1);
+ }
+ cur_idx = next_idx;
+ } else {
+ LOG(WARNING) << "fail to find path in version_graph. "
+ << "spec_version: " << spec_version.first << "-" <<
spec_version.second;
+ return OLAP_ERR_VERSION_NOT_EXIST;
+ }
}
- if (version_path != nullptr) {
- // Make version_path from reversed_path.
+ if (VLOG_TRACE_IS_ON && version_path != nullptr) {
std::stringstream shortest_path_for_debug;
- for (size_t path_id = reversed_path.size() - 1; path_id > 0;
--path_id) {
- int64_t tmp_start_vertex_value =
_version_graph[reversed_path[path_id]].value;
- int64_t tmp_end_vertex_value =
_version_graph[reversed_path[path_id - 1]].value;
-
- // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
- if (tmp_start_vertex_value <= tmp_end_vertex_value) {
- version_path->emplace_back(tmp_start_vertex_value,
tmp_end_vertex_value - 1);
- } else {
- version_path->emplace_back(tmp_end_vertex_value,
tmp_start_vertex_value - 1);
- }
-
- shortest_path_for_debug << (*version_path)[version_path->size() -
1] << ' ';
+ for (const auto& version : *version_path) {
+ shortest_path_for_debug << version << ' ';
}
VLOG_TRACE << "success to find path for spec_version. spec_version="
<< spec_version
- << ", path=" << shortest_path_for_debug.str();
+ << ", path=" << shortest_path_for_debug.str();
}
return OLAP_SUCCESS;
diff --git a/be/src/olap/version_graph.h b/be/src/olap/version_graph.h
index d5ef70e..4b8c42e 100644
--- a/be/src/olap/version_graph.h
+++ b/be/src/olap/version_graph.h
@@ -60,6 +60,7 @@ private:
// vertex's value is version.start_version, the other is
// version.end_version + 1.
// Use adjacency list to describe version graph.
+ // In order to speed up the version capture, vertex's edges are sorted by
version in descending order.
std::vector<Vertex> _version_graph;
// vertex value --> vertex_index of _version_graph
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]