This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f4c03fe  1. Delete the code of Sort Node we do not use now. (#3666)
f4c03fe is described below

commit f4c03fe8e28ce2fd8ba6a969c5a16ad02c010dce
Author: HappenLee <[email protected]>
AuthorDate: Tue May 26 10:20:57 2020 +0800

    1. Delete the code of Sort Node we do not use now. (#3666)
    
    Optimize the quick sort by find_the_median and try to reduce recursion 
level of quick sort.
---
 be/src/exec/CMakeLists.txt     |   1 -
 be/src/exec/exec_node.cpp      |   1 -
 be/src/exec/sort_node.cpp      | 156 -----------------------------------------
 be/src/exec/sort_node.h        |  76 --------------------
 be/src/runtime/spill_sorter.cc |  65 +++++++++++++----
 5 files changed, 50 insertions(+), 249 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 4844338..c42afce 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -49,7 +49,6 @@ set(EXEC_FILES
     text_converter.cpp
     topn_node.cpp
     sort_exec_exprs.cpp
-    sort_node.cpp
     olap_rewrite_node.cpp
     olap_scan_node.cpp
     olap_scanner.cpp
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 9a6f208..226e42a 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -45,7 +45,6 @@
 #include "exec/repeat_node.h"
 #include "exec/schema_scan_node.h"
 #include "exec/select_node.h"
-#include "exec/sort_node.h"
 #include "exec/spill_sort_node.h"
 #include "exec/topn_node.h"
 #include "exec/union_node.h"
diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp
deleted file mode 100644
index da1d9a3..0000000
--- a/be/src/exec/sort_node.cpp
+++ /dev/null
@@ -1,156 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/sort_node.h"
-#include "exec/sort_exec_exprs.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-SortNode::SortNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
-    : ExecNode(pool, tnode, descs),
-      _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
-      _num_rows_skipped(0) {
-    Status status = init(tnode, nullptr);
-    DCHECK(status.ok()) << "SortNode c'tor:init failed: \n" << 
status.get_error_msg();
-}
-
-SortNode::~SortNode() {
-}
-
-Status SortNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    const vector<TExpr>* sort_tuple_slot_exprs =  
tnode.sort_node.__isset.sort_tuple_slot_exprs ?
-            &tnode.sort_node.sort_tuple_slot_exprs : NULL;
-    RETURN_IF_ERROR(_sort_exec_exprs.init(tnode.sort_node.ordering_exprs,
-            sort_tuple_slot_exprs, _pool));
-    _is_asc_order = tnode.sort_node.is_asc_order;
-    _nulls_first = tnode.sort_node.nulls_first;
-    return Status::OK();
-}
-
-Status SortNode::prepare(RuntimeState* state) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(ExecNode::prepare(state));
-    RETURN_IF_ERROR(_sort_exec_exprs.prepare(
-            state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker()));
-    return Status::OK();
-}
-
-Status SortNode::open(RuntimeState* state) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(ExecNode::open(state));
-    RETURN_IF_ERROR(_sort_exec_exprs.open(state));
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(child(0)->open(state));
-
-    TupleRowComparator less_than(
-        _sort_exec_exprs.lhs_ordering_expr_ctxs(), 
_sort_exec_exprs.rhs_ordering_expr_ctxs(),
-        _is_asc_order, _nulls_first);
-    _sorter.reset(new MergeSorter(
-                      less_than, _sort_exec_exprs.sort_tuple_slot_expr_ctxs(),
-                      &_row_descriptor, runtime_profile(), state));
-
-    // The child has been opened and the sorter created. Sort the input.
-    // The final merge is done on-demand as rows are requested in GetNext().
-    RETURN_IF_ERROR(sort_input(state));
-
-    // The child can be closed at this point.
-    child(0)->close(state);
-    return Status::OK();
-}
-
-Status SortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) 
{
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
-    RETURN_IF_CANCELLED(state);
-    //RETURN_IF_ERROR(QueryMaintenance(state));
-
-    if (reached_limit()) {
-        *eos = true;
-        return Status::OK();
-    } else {
-        *eos = false;
-    }
-
-    DCHECK_EQ(row_batch->num_rows(), 0);
-    RETURN_IF_ERROR(_sorter->get_next(row_batch, eos));
-    while ((_num_rows_skipped < _offset)) {
-        _num_rows_skipped += row_batch->num_rows();
-        // Throw away rows in the output batch until the offset is skipped.
-        int rows_to_keep = _num_rows_skipped - _offset;
-        if (rows_to_keep > 0) {
-            row_batch->copy_rows(0, row_batch->num_rows() - rows_to_keep, 
rows_to_keep);
-            row_batch->set_num_rows(rows_to_keep);
-        } else {
-            row_batch->set_num_rows(0);
-        }
-        if (rows_to_keep > 0 || *eos) {
-            break;
-        }
-        RETURN_IF_ERROR(_sorter->get_next(row_batch, eos));
-    }
-
-    _num_rows_returned += row_batch->num_rows();
-    if (reached_limit()) {
-        row_batch->set_num_rows(row_batch->num_rows() - (_num_rows_returned - 
_limit));
-        *eos = true;
-    }
-
-    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-    return Status::OK();
-}
-
-Status SortNode::close(RuntimeState* state) {
-    if (is_closed()) {
-        return Status::OK();
-    }
-    _sort_exec_exprs.close(state);
-    _sorter.reset();
-    return ExecNode::close(state);
-}
-
-void SortNode::debug_string(int indentation_level, stringstream* out) const {
-    *out << string(indentation_level * 2, ' ');
-    *out << "SortNode(";
-         // << Expr::debug_string(_sort_exec_exprs.lhs_ordering_expr_ctxs());
-    for (int i = 0; i < _is_asc_order.size(); ++i) {
-        *out << (i > 0 ? " " : "")
-             << (_is_asc_order[i] ? "asc" : "desc")
-             << " nulls " << (_nulls_first[i] ? "first" : "last");
-    }
-    ExecNode::debug_string(indentation_level, out);
-    *out << ")";
-}
-
-Status SortNode::sort_input(RuntimeState* state) {
-    RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
-    bool eos = false;
-    do {
-        batch.reset();
-        RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos));
-        RETURN_IF_ERROR(_sorter->add_batch(&batch));
-        RETURN_IF_CANCELLED(state);
-        RETURN_IF_LIMIT_EXCEEDED(state, "Sort, while getting next from the 
child.");
-        // RETURN_IF_ERROR(QueryMaintenance(state));
-    } while (!eos);
-    RETURN_IF_ERROR(_sorter->input_done());
-    return Status::OK();
-}
-
-}
diff --git a/be/src/exec/sort_node.h b/be/src/exec/sort_node.h
deleted file mode 100644
index 2d5b1d8..0000000
--- a/be/src/exec/sort_node.h
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H
-#define INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H
-
-#include "exec/exec_node.h"
-#include "exec/sort_exec_exprs.h"
-#include "runtime/merge_sorter.h"
-#include "runtime/buffered_block_mgr.h"
-
-namespace doris {
-
-// Node that implements a full sort of its input with a fixed memory budget, 
spilling
-// to disk if the input is larger than available memory.
-// Uses Sorter and BufferedBlockMgr for the external sort implementation.
-// Input rows to SortNode are materialized by the Sorter into a single tuple
-// using the expressions specified in sort_exec_exprs_.
-// In GetNext(), SortNode passes in the output batch to the sorter instance 
created
-// in Open() to fill it with sorted rows.
-// If a merge phase was performed in the sort, sorted rows are deep copied into
-// the output batch. Otherwise, the sorter instance owns the sorted data.
-class SortNode : public ExecNode {
-public:
-    SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs);
-    ~SortNode();
-
-    virtual Status prepare(RuntimeState* state);
-    virtual Status open(RuntimeState* state);
-    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos);
-    virtual Status close(RuntimeState* state);
-
-protected:
-    virtual void debug_string(int indentation_level, std::stringstream* out) 
const;
-
-private:
-    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
-    // Fetch input rows and feed them to the sorter until the input is 
exhausted.
-    Status sort_input(RuntimeState* state);
-
-    // Create a block manager object and set it in block_mgr_.
-    // Returns and sets the query status to 
Status::MemoryLimitExceeded("Memory limit exceeded") if there is not
-    // enough memory for the sort.
-    Status create_block_mgr(RuntimeState* state);
-
-    // Number of rows to skip.
-    int64_t _offset;
-    int64_t _num_rows_skipped;
-
-    // Object used for external sorting.
-    boost::scoped_ptr<MergeSorter> _sorter;
-
-    // Expressions and parameters used for tuple materialization and tuple 
comparison.
-    SortExecExprs _sort_exec_exprs;
-    std::vector<bool> _is_asc_order;
-    std::vector<bool> _nulls_first;
-    boost::scoped_ptr<MemPool> _tuple_pool;
-};
-
-}
-
-#endif
diff --git a/be/src/runtime/spill_sorter.cc b/be/src/runtime/spill_sorter.cc
index 0a5b3ff..4456bc0 100644
--- a/be/src/runtime/spill_sorter.cc
+++ b/be/src/runtime/spill_sorter.cc
@@ -364,11 +364,16 @@ private:
     void insertion_sort(const TupleIterator& first, const TupleIterator& last);
 
     // Partitions the sequence of tuples in the range [first, last) in a run 
into two
-    // groups around the pivot tuple - i.e. tuples in first group are <= the 
pivot, and
-    // tuples in the second group are >= pivot. Tuples are swapped in place to 
create the
+    // groups around the mid._current_tuple - i.e. tuples in first group are 
<= the mid._current_tuple
+    // and tuples in the second group are >= mid._current_tuple. Tuples are 
swapped in place to create the
     // groups and the index to the first element in the second group is 
returned.
     // Checks _state->is_cancelled() and returns early with an invalid result 
if true.
-    TupleIterator partition(TupleIterator first, TupleIterator last, Tuple* 
pivot);
+    TupleIterator partition(TupleIterator first, TupleIterator last, 
TupleIterator& mid);
+
+    // Select the median of three iterator tuples. taking the median tends to 
help us select better
+    // pivots that more evenly split the input range. This method makes 
selection of
+    // bad pivots very infrequent.
+    void find_the_median(TupleIterator& first, TupleIterator& last, 
TupleIterator& mid);
 
     // Performs a quicksort of rows in the range [first, last) followed by 
insertion sort
     // for smaller groups of elements.
@@ -931,12 +936,34 @@ void SpillSorter::TupleSorter::insertion_sort(const 
TupleIterator& first,
     }
 }
 
+void SpillSorter::TupleSorter::find_the_median(TupleSorter::TupleIterator 
&first,
+        TupleSorter::TupleIterator &last, TupleSorter::TupleIterator &mid) {
+    last.prev();
+    auto f_com_result = 
_less_than_comp.compare(reinterpret_cast<TupleRow*>(&first._current_tuple), 
reinterpret_cast<TupleRow*>(&mid._current_tuple));
+    auto l_com_result = 
_less_than_comp.compare(reinterpret_cast<TupleRow*>(&last._current_tuple), 
reinterpret_cast<TupleRow*>(&mid._current_tuple));
+    if (f_com_result == -1 && l_com_result == -1) {
+        if 
(_less_than_comp(reinterpret_cast<TupleRow*>(&first._current_tuple),reinterpret_cast<TupleRow*>(&last._current_tuple)))
 {
+            swap(mid._current_tuple, last._current_tuple);
+        } else {
+            swap(mid._current_tuple, first._current_tuple);
+        }
+    }
+    if (f_com_result == 1 && l_com_result == 1) {
+        if (_less_than_comp(reinterpret_cast<TupleRow 
*>(&first._current_tuple),
+                            reinterpret_cast<TupleRow 
*>(&last._current_tuple))) {
+            swap(mid._current_tuple, first._current_tuple);
+        } else {
+            swap(mid._current_tuple, last._current_tuple);
+        }
+    }
+}
+
 SpillSorter::TupleSorter::TupleIterator SpillSorter::TupleSorter::partition(
-        TupleIterator first, TupleIterator last, Tuple* pivot) {
-    // Copy pivot into temp_tuple since it points to a tuple within [first, 
last).
-    memcpy(_temp_tuple_buffer, pivot, _tuple_size);
+        TupleIterator first, TupleIterator last, TupleIterator& mid) {
+    find_the_median(first, last, mid);
 
-    last.prev();
+    // Copy &mid._current_tuple into temp_tuple since it points to a tuple 
within [first, last).
+    memcpy(_temp_tuple_buffer, mid._current_tuple, _tuple_size);
     while (true) {
         // Search for the first and last out-of-place elements, and swap them.
         while (_less_than_comp(
@@ -968,14 +995,22 @@ void SpillSorter::TupleSorter::sort_helper(TupleIterator 
first, TupleIterator la
     }
     // Use insertion sort for smaller sequences.
     while (last._index - first._index > INSERTION_THRESHOLD) {
-        TupleIterator iter(this, first._index + (last._index - first._index) / 
2);
-        DCHECK(iter._current_tuple != NULL);
-        // partition() splits the tuples in [first, last) into two groups (<= 
pivot
-        // and >= pivot) in-place. 'cut' is the index of the first tuple in 
the second group.
-        TupleIterator cut = partition(first, last,
-                reinterpret_cast<Tuple*>(iter._current_tuple));
-        sort_helper(cut, last);
-        last = cut;
+        TupleIterator mid(this, first._index + (last._index - first._index) / 
2);
+
+        DCHECK(mid._current_tuple != NULL);
+        // partition() splits the tuples in [first, last) into two groups (<=  
mid iter
+        // and >= mid iter) in-place. 'cut' is the index of the first tuple in 
the second group.
+        TupleIterator cut = partition(first, last, mid);
+
+        // Recurse on the smaller partition. This limits stack size to log(n) 
stack frames.
+        if (last._index - cut._index < cut._index - first._index) {
+            sort_helper(cut, last);
+            last = cut;
+        } else {
+            sort_helper(first, cut);
+            first = cut;
+        }
+       
         if (UNLIKELY(_state->is_cancelled())) {
             return;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to