This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a841905  [optimization] use replace top instead of push pop in 
priority #5312 (#5313)
a841905 is described below

commit a84190518459b63089da72e3f6d4278d3187ac4d
Author: stdpain <[email protected]>
AuthorDate: Wed Feb 3 17:21:54 2021 -0800

    [optimization] use replace top instead of push pop in priority #5312 (#5313)
---
 be/src/exec/topn_node.cpp       |  34 +++---------
 be/src/exec/topn_node.h         |   9 +--
 be/src/util/sort_heap.h         | 120 ++++++++++++++++++++++++++++++++++++++++
 be/test/util/CMakeLists.txt     |   1 +
 be/test/util/sort_heap_test.cpp |  95 +++++++++++++++++++++++++++++++
 5 files changed, 228 insertions(+), 31 deletions(-)

diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp
index 837d8bb..b47ab63 100644
--- a/be/src/exec/topn_node.cpp
+++ b/be/src/exec/topn_node.cpp
@@ -17,8 +17,6 @@
 
 #include "exec/topn_node.h"
 
-#include <gperftools/profiler.h>
-
 #include <sstream>
 
 #include "exprs/expr.h"
@@ -43,7 +41,7 @@ TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, 
const DescriptorTbl
           _tuple_row_less_than(NULL),
           _tuple_pool(NULL),
           _num_rows_skipped(0),
-          _priority_queue(NULL) {}
+          _priority_queue() {}
 
 TopNNode::~TopNNode() {}
 
@@ -85,10 +83,9 @@ Status TopNNode::open(RuntimeState* state) {
     // Avoid creating them after every Reset()/Open().
     // TODO: For some reason initializing _priority_queue in Prepare() causes 
a 30% perf
     // regression. Why??
-    if (_priority_queue.get() == NULL) {
-        _priority_queue.reset(
-                new std::priority_queue<Tuple*, std::vector<Tuple*>, 
TupleRowComparator>(
-                        *_tuple_row_less_than));
+    if (_priority_queue == nullptr) {
+        _priority_queue.reset(new SortingHeap<Tuple*, std::vector<Tuple*>, 
TupleRowComparator>(
+                *_tuple_row_less_than));
     }
 
     // Allocate memory for a temporary tuple.
@@ -180,14 +177,13 @@ Status TopNNode::close(RuntimeState* state) {
 
 // Insert if either not at the limit or it's a new TopN tuple_row
 void TopNNode::insert_tuple_row(TupleRow* input_row) {
-    Tuple* insert_tuple = NULL;
-
     if (_priority_queue->size() < _offset + _limit) {
-        insert_tuple = reinterpret_cast<Tuple*>(
+        auto insert_tuple = reinterpret_cast<Tuple*>(
                 _tuple_pool->allocate(_materialized_tuple_desc->byte_size()));
         insert_tuple->materialize_exprs<false>(input_row, 
*_materialized_tuple_desc,
                                                
_sort_exec_exprs.sort_tuple_slot_expr_ctxs(),
                                                _tuple_pool.get(), NULL, NULL);
+        _priority_queue->push(insert_tuple);
     } else {
         DCHECK(!_priority_queue->empty());
         Tuple* top_tuple = _priority_queue->top();
@@ -199,27 +195,15 @@ void TopNNode::insert_tuple_row(TupleRow* input_row) {
             // TODO: DeepCopy will allocate new buffers for the string data.  
This needs
             // to be fixed to use a freelist
             _tmp_tuple->deep_copy(top_tuple, *_materialized_tuple_desc, 
_tuple_pool.get());
-            insert_tuple = top_tuple;
-            _priority_queue->pop();
+            auto insert_tuple = top_tuple;
+            _priority_queue->replace_top(insert_tuple);
         }
     }
-
-    if (insert_tuple != NULL) {
-        _priority_queue->push(insert_tuple);
-    }
 }
 
 // Reverse the order of the tuples in the priority queue
 void TopNNode::prepare_for_output() {
-    _sorted_top_n.resize(_priority_queue->size());
-    int index = _sorted_top_n.size() - 1;
-
-    while (_priority_queue->size() > 0) {
-        Tuple* tuple = _priority_queue->top();
-        _priority_queue->pop();
-        _sorted_top_n[index] = tuple;
-        --index;
-    }
+    _sorted_top_n = _priority_queue->sorted_seq();
 
     _get_next_iter = _sorted_top_n.begin();
 }
diff --git a/be/src/exec/topn_node.h b/be/src/exec/topn_node.h
index 2077da1..74a7dd0 100644
--- a/be/src/exec/topn_node.h
+++ b/be/src/exec/topn_node.h
@@ -23,6 +23,7 @@
 
 #include "exec/exec_node.h"
 #include "runtime/descriptors.h"
+#include "util/sort_heap.h"
 #include "util/tuple_row_compare.h"
 
 namespace doris {
@@ -101,12 +102,8 @@ private:
     // Number of rows skipped. Used for adhering to _offset.
     int64_t _num_rows_skipped;
 
-    // The priority queue will never have more elements in it than the LIMIT.  
The stl
-    // priority queue doesn't support a max size, so to get that 
functionality, the order
-    // of the queue is the opposite of what the ORDER BY clause specifies, 
such that the top
-    // of the queue is the last sorted element.
-    boost::scoped_ptr<std::priority_queue<Tuple*, std::vector<Tuple*>, 
TupleRowComparator>>
-            _priority_queue;
+    // The priority queue will never have more elements in it than the LIMIT.  
    
+    std::unique_ptr<SortingHeap<Tuple*, std::vector<Tuple*>, 
TupleRowComparator>> _priority_queue;
 
     // END: Members that must be Reset()
     /////////////////////////////////////////
diff --git a/be/src/util/sort_heap.h b/be/src/util/sort_heap.h
new file mode 100644
index 0000000..2bbba55
--- /dev/null
+++ b/be/src/util/sort_heap.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <algorithm>
+#include <cassert>
+#include <queue>
+#include <utility>
+
+#include "common/compiler_util.h"
+
+namespace doris {
+
+template <typename T, typename Sequence, typename Compare>
+class SortingHeap {
+public:
+    SortingHeap(const Compare& comp) : _comp(comp) {}
+
+    bool is_valid() const { return !_queue.empty(); }
+
+    T top() { return _queue.front(); }
+
+    size_t size() { return _queue.size(); }
+
+    bool empty() { return _queue.empty(); }
+
+    T& next_child() { return _queue[_next_child_index()]; }
+
+    void replace_top(T new_top) {
+        *_queue.begin() = new_top;
+        update_top();
+    }
+
+    void remove_top() {
+        std::pop_heap(_queue.begin(), _queue.end(), _comp);
+        _queue.pop_back();
+        _next_idx = 0;
+    }
+
+    void push(T cursor) {
+        _queue.emplace_back(cursor);
+        std::push_heap(_queue.begin(), _queue.end(), _comp);
+        _next_idx = 0;
+    }
+
+    Sequence&& sorted_seq() {
+        std::sort_heap(_queue.begin(), _queue.end(), _comp);
+        return std::move(_queue);
+    }
+
+private:
+    Sequence _queue;
+    Compare _comp;
+
+    /// Cache comparison between first and second child if the order in queue 
has not been changed.
+    size_t _next_idx = 0;
+
+    size_t _next_child_index() {
+        if (_next_idx == 0) {
+            _next_idx = 1;
+            if (_queue.size() > 2 && _comp(_queue[1], _queue[2])) ++_next_idx;
+        }
+
+        return _next_idx;
+    }
+
+    void update_top() {
+        size_t size = _queue.size();
+        if (size < 2) return;
+
+        auto begin = _queue.begin();
+
+        size_t child_idx = _next_child_index();
+        auto child_it = begin + child_idx;
+
+        /// Check if we are in order.
+        if (_comp(*child_it, *begin)) return;
+
+        _next_idx = 0;
+
+        auto curr_it = begin;
+        auto top = *begin;
+        do {
+            /// We are not in heap-order, swap the parent with it's largest 
child.
+            *curr_it = *child_it;
+            curr_it = child_it;
+
+            // recompute the child based off of the updated parent
+            child_idx = 2 * child_idx + 1;
+
+            if (child_idx >= size) break;
+
+            child_it = begin + child_idx;
+
+            if ((child_idx + 1) < size && _comp(*child_it, *(child_it + 1))) {
+                /// Right child exists and is greater than left child.
+                ++child_it;
+                ++child_idx;
+            }
+
+        /// Check if we are in order.
+        } while (!(_comp(*child_it, top)));
+        *curr_it = top;
+    }
+};
+} // namespace doris
diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt
index 25d3122..a26ee99 100644
--- a/be/test/util/CMakeLists.txt
+++ b/be/test/util/CMakeLists.txt
@@ -67,3 +67,4 @@ ADD_BE_TEST(trace_test)
 ADD_BE_TEST(easy_json-test)
 ADD_BE_TEST(http_channel_test)
 ADD_BE_TEST(histogram_test)
+ADD_BE_TEST(sort_heap_test)
diff --git a/be/test/util/sort_heap_test.cpp b/be/test/util/sort_heap_test.cpp
new file mode 100644
index 0000000..cafd3df
--- /dev/null
+++ b/be/test/util/sort_heap_test.cpp
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/sort_heap.h"
+
+#include <algorithm>
+#include <queue>
+#include <random>
+
+#include "gtest/gtest.h"
+
+namespace doris {
+
+struct int_comparator {
+    bool operator()(int l, int r) { return l < r; }
+};
+
+class SortHeapTest : public testing::Test {
+public:
+    SortHeapTest() = default;
+    ~SortHeapTest() = default;
+
+private:
+    std::default_random_engine _re;
+    int_comparator cp;
+};
+
+TEST_F(SortHeapTest, IntBasicTest) {
+    std::priority_queue<int, std::vector<int>, int_comparator> pq(cp);
+    doris::SortingHeap<int, std::vector<int>, int_comparator> sh(cp);
+    // test default result
+    const int test_case_1 = 10;
+    for (size_t i = 0; i < test_case_1; ++i) {
+        int res = _re();
+        pq.push(res);
+        sh.push(res);
+    }
+    EXPECT_EQ(pq.size(), sh.size());
+    for (size_t i = 0; i < test_case_1; ++i) {
+        EXPECT_EQ(sh.top(), pq.top());
+        pq.pop();
+        sh.remove_top();
+    }
+}
+
+TEST_F(SortHeapTest, IntReplaceTest) {
+    std::priority_queue<int, std::vector<int>, int_comparator> pq(cp);
+    doris::SortingHeap<int, std::vector<int>, int_comparator> sh(cp);
+    // test replace
+    const int test_case_2 = 10;
+    for (size_t i = 0; i < test_case_2; ++i) {
+        int res = _re();
+        pq.push(res);
+        sh.push(res);
+    }
+
+    for (size_t i = 0; i < 2 * test_case_2; ++i) {
+        int res = _re();
+        EXPECT_EQ(sh.top(), pq.top());
+        if (res < sh.top()) {
+            sh.replace_top(res);
+            pq.pop();
+            pq.push(res);
+        }
+    }
+
+    EXPECT_EQ(sh.size(), pq.size());
+    int container_size = sh.size();
+    for (size_t i = 0; i < container_size; ++i) {
+        EXPECT_EQ(sh.top(), pq.top());
+        pq.pop();
+        sh.remove_top();
+    }
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to