This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a841905 [optimization] use replace top instead of push pop in
priority #5312 (#5313)
a841905 is described below
commit a84190518459b63089da72e3f6d4278d3187ac4d
Author: stdpain <[email protected]>
AuthorDate: Wed Feb 3 17:21:54 2021 -0800
[optimization] use replace top instead of push pop in priority #5312 (#5313)
---
be/src/exec/topn_node.cpp | 34 +++---------
be/src/exec/topn_node.h | 9 +--
be/src/util/sort_heap.h | 120 ++++++++++++++++++++++++++++++++++++++++
be/test/util/CMakeLists.txt | 1 +
be/test/util/sort_heap_test.cpp | 95 +++++++++++++++++++++++++++++++
5 files changed, 228 insertions(+), 31 deletions(-)
diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp
index 837d8bb..b47ab63 100644
--- a/be/src/exec/topn_node.cpp
+++ b/be/src/exec/topn_node.cpp
@@ -17,8 +17,6 @@
#include "exec/topn_node.h"
-#include <gperftools/profiler.h>
-
#include <sstream>
#include "exprs/expr.h"
@@ -43,7 +41,7 @@ TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl
_tuple_row_less_than(NULL),
_tuple_pool(NULL),
_num_rows_skipped(0),
- _priority_queue(NULL) {}
+ _priority_queue() {}
TopNNode::~TopNNode() {}
@@ -85,10 +83,9 @@ Status TopNNode::open(RuntimeState* state) {
// Avoid creating them after every Reset()/Open().
// TODO: For some reason initializing _priority_queue in Prepare() causes
a 30% perf
// regression. Why??
- if (_priority_queue.get() == NULL) {
- _priority_queue.reset(
- new std::priority_queue<Tuple*, std::vector<Tuple*>,
TupleRowComparator>(
- *_tuple_row_less_than));
+ if (_priority_queue == nullptr) {
+ _priority_queue.reset(new SortingHeap<Tuple*, std::vector<Tuple*>,
TupleRowComparator>(
+ *_tuple_row_less_than));
}
// Allocate memory for a temporary tuple.
@@ -180,14 +177,13 @@ Status TopNNode::close(RuntimeState* state) {
// Insert if either not at the limit or it's a new TopN tuple_row
void TopNNode::insert_tuple_row(TupleRow* input_row) {
- Tuple* insert_tuple = NULL;
-
if (_priority_queue->size() < _offset + _limit) {
- insert_tuple = reinterpret_cast<Tuple*>(
+ auto insert_tuple = reinterpret_cast<Tuple*>(
_tuple_pool->allocate(_materialized_tuple_desc->byte_size()));
insert_tuple->materialize_exprs<false>(input_row,
*_materialized_tuple_desc,
_sort_exec_exprs.sort_tuple_slot_expr_ctxs(),
_tuple_pool.get(), NULL, NULL);
+ _priority_queue->push(insert_tuple);
} else {
DCHECK(!_priority_queue->empty());
Tuple* top_tuple = _priority_queue->top();
@@ -199,27 +195,15 @@ void TopNNode::insert_tuple_row(TupleRow* input_row) {
// TODO: DeepCopy will allocate new buffers for the string data.
This needs
// to be fixed to use a freelist
_tmp_tuple->deep_copy(top_tuple, *_materialized_tuple_desc,
_tuple_pool.get());
- insert_tuple = top_tuple;
- _priority_queue->pop();
+ auto insert_tuple = top_tuple;
+ _priority_queue->replace_top(insert_tuple);
}
}
-
- if (insert_tuple != NULL) {
- _priority_queue->push(insert_tuple);
- }
}
// Reverse the order of the tuples in the priority queue
void TopNNode::prepare_for_output() {
- _sorted_top_n.resize(_priority_queue->size());
- int index = _sorted_top_n.size() - 1;
-
- while (_priority_queue->size() > 0) {
- Tuple* tuple = _priority_queue->top();
- _priority_queue->pop();
- _sorted_top_n[index] = tuple;
- --index;
- }
+ _sorted_top_n = _priority_queue->sorted_seq();
_get_next_iter = _sorted_top_n.begin();
}
diff --git a/be/src/exec/topn_node.h b/be/src/exec/topn_node.h
index 2077da1..74a7dd0 100644
--- a/be/src/exec/topn_node.h
+++ b/be/src/exec/topn_node.h
@@ -23,6 +23,7 @@
#include "exec/exec_node.h"
#include "runtime/descriptors.h"
+#include "util/sort_heap.h"
#include "util/tuple_row_compare.h"
namespace doris {
@@ -101,12 +102,8 @@ private:
// Number of rows skipped. Used for adhering to _offset.
int64_t _num_rows_skipped;
- // The priority queue will never have more elements in it than the LIMIT.
The stl
- // priority queue doesn't support a max size, so to get that
functionality, the order
- // of the queue is the opposite of what the ORDER BY clause specifies,
such that the top
- // of the queue is the last sorted element.
- boost::scoped_ptr<std::priority_queue<Tuple*, std::vector<Tuple*>,
TupleRowComparator>>
- _priority_queue;
+ // The priority queue will never have more elements in it than the LIMIT.
+ std::unique_ptr<SortingHeap<Tuple*, std::vector<Tuple*>,
TupleRowComparator>> _priority_queue;
// END: Members that must be Reset()
/////////////////////////////////////////
diff --git a/be/src/util/sort_heap.h b/be/src/util/sort_heap.h
new file mode 100644
index 0000000..2bbba55
--- /dev/null
+++ b/be/src/util/sort_heap.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <algorithm>
+#include <cassert>
+#include <queue>
+#include <utility>
+
+#include "common/compiler_util.h"
+
+namespace doris {
+
+template <typename T, typename Sequence, typename Compare>
+class SortingHeap {
+public:
+ SortingHeap(const Compare& comp) : _comp(comp) {}
+
+ bool is_valid() const { return !_queue.empty(); }
+
+ T top() { return _queue.front(); }
+
+ size_t size() { return _queue.size(); }
+
+ bool empty() { return _queue.empty(); }
+
+ T& next_child() { return _queue[_next_child_index()]; }
+
+ void replace_top(T new_top) {
+ *_queue.begin() = new_top;
+ update_top();
+ }
+
+ void remove_top() {
+ std::pop_heap(_queue.begin(), _queue.end(), _comp);
+ _queue.pop_back();
+ _next_idx = 0;
+ }
+
+ void push(T cursor) {
+ _queue.emplace_back(cursor);
+ std::push_heap(_queue.begin(), _queue.end(), _comp);
+ _next_idx = 0;
+ }
+
+ Sequence&& sorted_seq() {
+ std::sort_heap(_queue.begin(), _queue.end(), _comp);
+ return std::move(_queue);
+ }
+
+private:
+ Sequence _queue;
+ Compare _comp;
+
+ /// Cache comparison between first and second child if the order in queue
has not been changed.
+ size_t _next_idx = 0;
+
+ size_t _next_child_index() {
+ if (_next_idx == 0) {
+ _next_idx = 1;
+ if (_queue.size() > 2 && _comp(_queue[1], _queue[2])) ++_next_idx;
+ }
+
+ return _next_idx;
+ }
+
+ void update_top() {
+ size_t size = _queue.size();
+ if (size < 2) return;
+
+ auto begin = _queue.begin();
+
+ size_t child_idx = _next_child_index();
+ auto child_it = begin + child_idx;
+
+ /// Check if we are in order.
+ if (_comp(*child_it, *begin)) return;
+
+ _next_idx = 0;
+
+ auto curr_it = begin;
+ auto top = *begin;
+ do {
+ /// We are not in heap-order, swap the parent with it's largest
child.
+ *curr_it = *child_it;
+ curr_it = child_it;
+
+ // recompute the child based off of the updated parent
+ child_idx = 2 * child_idx + 1;
+
+ if (child_idx >= size) break;
+
+ child_it = begin + child_idx;
+
+ if ((child_idx + 1) < size && _comp(*child_it, *(child_it + 1))) {
+ /// Right child exists and is greater than left child.
+ ++child_it;
+ ++child_idx;
+ }
+
+ /// Check if we are in order.
+ } while (!(_comp(*child_it, top)));
+ *curr_it = top;
+ }
+};
+} // namespace doris
diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt
index 25d3122..a26ee99 100644
--- a/be/test/util/CMakeLists.txt
+++ b/be/test/util/CMakeLists.txt
@@ -67,3 +67,4 @@ ADD_BE_TEST(trace_test)
ADD_BE_TEST(easy_json-test)
ADD_BE_TEST(http_channel_test)
ADD_BE_TEST(histogram_test)
+ADD_BE_TEST(sort_heap_test)
diff --git a/be/test/util/sort_heap_test.cpp b/be/test/util/sort_heap_test.cpp
new file mode 100644
index 0000000..cafd3df
--- /dev/null
+++ b/be/test/util/sort_heap_test.cpp
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/sort_heap.h"
+
+#include <algorithm>
+#include <queue>
+#include <random>
+
+#include "gtest/gtest.h"
+
+namespace doris {
+
+struct int_comparator {
+ bool operator()(int l, int r) { return l < r; }
+};
+
+class SortHeapTest : public testing::Test {
+public:
+ SortHeapTest() = default;
+ ~SortHeapTest() = default;
+
+private:
+ std::default_random_engine _re;
+ int_comparator cp;
+};
+
+TEST_F(SortHeapTest, IntBasicTest) {
+ std::priority_queue<int, std::vector<int>, int_comparator> pq(cp);
+ doris::SortingHeap<int, std::vector<int>, int_comparator> sh(cp);
+ // test default result
+ const int test_case_1 = 10;
+ for (size_t i = 0; i < test_case_1; ++i) {
+ int res = _re();
+ pq.push(res);
+ sh.push(res);
+ }
+ EXPECT_EQ(pq.size(), sh.size());
+ for (size_t i = 0; i < test_case_1; ++i) {
+ EXPECT_EQ(sh.top(), pq.top());
+ pq.pop();
+ sh.remove_top();
+ }
+}
+
+TEST_F(SortHeapTest, IntReplaceTest) {
+ std::priority_queue<int, std::vector<int>, int_comparator> pq(cp);
+ doris::SortingHeap<int, std::vector<int>, int_comparator> sh(cp);
+ // test replace
+ const int test_case_2 = 10;
+ for (size_t i = 0; i < test_case_2; ++i) {
+ int res = _re();
+ pq.push(res);
+ sh.push(res);
+ }
+
+ for (size_t i = 0; i < 2 * test_case_2; ++i) {
+ int res = _re();
+ EXPECT_EQ(sh.top(), pq.top());
+ if (res < sh.top()) {
+ sh.replace_top(res);
+ pq.pop();
+ pq.push(res);
+ }
+ }
+
+ EXPECT_EQ(sh.size(), pq.size());
+ int container_size = sh.size();
+ for (size_t i = 0; i < container_size; ++i) {
+ EXPECT_EQ(sh.top(), pq.top());
+ pq.pop();
+ sh.remove_top();
+ }
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]