This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ac0322dc746 [Enchancement](sort) change priority_queue to ck 
SortingQueue (#45952)
ac0322dc746 is described below

commit ac0322dc7461df147367a20ed6499802bcd5c387
Author: Pxl <[email protected]>
AuthorDate: Tue Jan 7 14:56:09 2025 +0800

    [Enchancement](sort) change priority_queue to ck SortingQueue (#45952)
    
    1. change priority_queue to ck SortingQueue(a heap whitch support modify
    top element)
    2. avoid some convert_if_const usage
    
    
    
![QQ_1735532950407](https://github.com/user-attachments/assets/ec7b52c1-424b-4d7f-993c-64410ec35ba5)
---
 be/src/vec/columns/column.h                        |   1 -
 be/src/vec/common/pod_array.h                      |   8 +-
 be/src/vec/common/sort/partition_sorter.cpp        | 197 +++++++--------
 be/src/vec/common/sort/partition_sorter.h          |  19 +-
 be/src/vec/common/sort/sorter.cpp                  | 107 ++++----
 be/src/vec/common/sort/sorter.h                    |  37 +--
 be/src/vec/core/sort_cursor.h                      | 280 +++++++++++++++++++--
 .../apache/doris/planner/PartitionSortNode.java    |   5 +-
 8 files changed, 460 insertions(+), 194 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 729e5470c97..bc8ab101ecf 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -196,7 +196,6 @@ public:
 
     /// Appends range of elements from other column with the same type.
     /// Could be used to concatenate columns.
-    /// TODO: we need `insert_range_from_const` for every column type.
     virtual void insert_range_from(const IColumn& src, size_t start, size_t 
length) = 0;
 
     /// Appends range of elements from other column with the same type.
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index f798ca69bd6..c55dc0cc33e 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -355,14 +355,14 @@ public:
     /// The index is signed to access -1th element without pointer overflow.
     T& operator[](ssize_t n) {
         /// <= size, because taking address of one element past memory range 
is Ok in C++ (expression like &arr[arr.size()] is perfectly valid).
-        assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) &&
-               (n <= static_cast<ssize_t>(this->size())));
+        DCHECK_GE(n, (static_cast<ssize_t>(pad_left_) ? -1 : 0));
+        DCHECK_LE(n, static_cast<ssize_t>(this->size()));
         return t_start()[n];
     }
 
     const T& operator[](ssize_t n) const {
-        assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) &&
-               (n <= static_cast<ssize_t>(this->size())));
+        DCHECK_GE(n, (static_cast<ssize_t>(pad_left_) ? -1 : 0));
+        DCHECK_LE(n, static_cast<ssize_t>(this->size()));
         return t_start()[n];
     }
 
diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
index c47a8a88952..f6a3cb443aa 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -49,9 +49,11 @@ PartitionSorter::PartitionSorter(VSortExecExprs& 
vsort_exec_exprs, int limit, in
         : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
           _state(MergeSorterState::create_unique(row_desc, offset, limit, 
state, profile)),
           _row_desc(row_desc),
-          _has_global_limit(has_global_limit),
           _partition_inner_limit(partition_inner_limit),
-          _top_n_algorithm(top_n_algorithm),
+          _top_n_algorithm(
+                  has_global_limit
+                          ? TopNAlgorithm::ROW_NUMBER
+                          : top_n_algorithm), // FE will make this 
modification, but still maintain this code for compatibility
           _previous_row(previous_row) {}
 
 Status PartitionSorter::append_block(Block* input_block) {
@@ -64,10 +66,13 @@ Status PartitionSorter::append_block(Block* input_block) {
 
 Status PartitionSorter::prepare_for_read() {
     auto& blocks = _state->get_sorted_block();
-    auto& priority_queue = _state->get_priority_queue();
+    auto& queue = _state->get_queue();
+    std::vector<MergeSortCursor> cursors;
     for (auto& block : blocks) {
-        priority_queue.emplace(MergeSortCursorImpl::create_shared(block, 
_sort_description));
+        cursors.emplace_back(
+                MergeSortCursorImpl::create_shared(std::move(block), 
_sort_description));
     }
+    queue = MergeSorterQueue(cursors);
     blocks.clear();
     return Status::OK();
 }
@@ -88,122 +93,114 @@ void PartitionSorter::reset_sorter_state(RuntimeState* 
runtime_state) {
 }
 
 Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) 
{
-    if (_state->get_priority_queue().empty()) {
-        *eos = true;
-    } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
-        block->swap(*_state->get_priority_queue().top().impl->block);
-        block->set_num_rows(_partition_inner_limit);
-        *eos = true;
+    if (_top_n_algorithm == TopNAlgorithm::ROW_NUMBER) {
+        return _read_row_num(block, eos, state->batch_size());
     } else {
-        RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
+        return _read_row_rank(block, eos, state->batch_size());
     }
-    return Status::OK();
 }
 
-Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, 
int batch_size) {
-    auto& priority_queue = _state->get_priority_queue();
-    const auto& sorted_block = priority_queue.top().impl->block;
-    size_t num_columns = sorted_block->columns();
+Status PartitionSorter::_read_row_num(Block* output_block, bool* eos, int 
batch_size) {
+    auto& queue = _state->get_queue();
+    size_t num_columns = _state->unsorted_block()->columns();
+
     MutableBlock m_block =
-            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*sorted_block);
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*_state->unsorted_block());
     MutableColumns& merged_columns = m_block.mutable_columns();
-    size_t current_output_rows = 0;
-
-    bool get_enough_data = false;
-    while (!priority_queue.empty()) {
-        auto current = priority_queue.top();
-        priority_queue.pop();
-        if (UNLIKELY(_previous_row->impl == nullptr)) {
-            *_previous_row = current;
+    size_t merged_rows = 0;
+
+    Defer defer {[&]() {
+        if (merged_rows == 0 || _get_enough_data()) {
+            *eos = true;
         }
+    }};
 
-        switch (_top_n_algorithm) {
-        case TopNAlgorithm::ROW_NUMBER: {
-            //1 row_number no need to check distinct, just output 
partition_inner_limit row
-            if ((current_output_rows + _output_total_rows) < 
_partition_inner_limit) {
-                for (size_t i = 0; i < num_columns; ++i) {
-                    
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
-                }
-            } else {
-                //rows has get enough
-                get_enough_data = true;
+    while (queue.is_valid() && merged_rows < batch_size && 
!_get_enough_data()) {
+        auto [current, current_rows] = queue.current();
+
+        // row_number no need to check distinct, just output 
partition_inner_limit row
+        size_t needed_rows = _partition_inner_limit - _output_total_rows;
+        size_t step = std::min(needed_rows, std::min(current_rows, batch_size 
- merged_rows));
+
+        if (current->impl->is_last(step) && current->impl->pos == 0) {
+            if (merged_rows != 0) {
+                // return directly for next time's read swap whole block
+                return Status::OK();
             }
-            current_output_rows++;
-            break;
+            // swap and return block directly when we should get all data from 
cursor
+            output_block->swap(*current->impl->block);
+            merged_rows += step;
+            _output_total_rows += step;
+            queue.remove_top();
+            return Status::OK();
         }
-        case TopNAlgorithm::DENSE_RANK: {
-            //  dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk 
< 3, need output all 1 and 2
-            //3 dense_rank() maybe need distinct rows of partition_inner_limit
-            //3.1 _has_global_limit = true, so check (current_output_rows + 
_output_total_rows) >= _partition_inner_limit)
-            //3.2 _has_global_limit = false. so check have output distinct 
rows, not _output_total_rows
-            if (_has_global_limit &&
-                (current_output_rows + _output_total_rows) >= 
_partition_inner_limit) {
-                get_enough_data = true;
-                break;
-            }
-            if (_has_global_limit) {
-                current_output_rows++;
-            } else {
-                bool cmp_res = _previous_row->compare_two_rows(current);
-                //get a distinct row
-                if (cmp_res == false) {
-                    _output_distinct_rows++; //need rows++ firstly
-                    if (_output_distinct_rows >= _partition_inner_limit) {
-                        get_enough_data = true;
-                        break;
-                    }
-                    *_previous_row = current;
-                }
-            }
+
+        if (step) {
+            merged_rows += step;
+            _output_total_rows += step;
             for (size_t i = 0; i < num_columns; ++i) {
-                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+                
merged_columns[i]->insert_range_from(*current->impl->columns[i], 
current->impl->pos,
+                                                     step);
             }
-            break;
         }
-        case TopNAlgorithm::RANK: {
-            //  rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, 
need output all 1,1,1,4,5,6,6,....6
-            //2 rank() maybe need check when have get a distinct row
-            //2.1 _has_global_limit = true: (current_output_rows + 
_output_total_rows) >= _partition_inner_limit)
-            //2.2 _has_global_limit = false: so when the cmp_res is get a 
distinct row, need check have output all rows num
-            if (_has_global_limit &&
-                (current_output_rows + _output_total_rows) >= 
_partition_inner_limit) {
-                get_enough_data = true;
-                break;
-            }
-            bool cmp_res = _previous_row->compare_two_rows(current);
-            //get a distinct row
-            if (cmp_res == false) {
-                //here must be check distinct of two rows, and then check nums 
of row
-                if ((current_output_rows + _output_total_rows) >= 
_partition_inner_limit) {
-                    get_enough_data = true;
-                    break;
+
+        if (!current->impl->is_last(step)) {
+            queue.next(step);
+        } else {
+            queue.remove_top();
+        }
+    }
+
+    return Status::OK();
+}
+
+Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int 
batch_size) {
+    auto& queue = _state->get_queue();
+    size_t num_columns = _state->unsorted_block()->columns();
+
+    MutableBlock m_block =
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*_state->unsorted_block());
+    MutableColumns& merged_columns = m_block.mutable_columns();
+    size_t merged_rows = 0;
+
+    Defer defer {[&]() {
+        if (merged_rows == 0 || _get_enough_data()) {
+            *eos = true;
+        }
+    }};
+
+    while (queue.is_valid() && merged_rows < batch_size) {
+        auto [current, current_rows] = queue.current();
+
+        for (size_t offset = 0; offset < current_rows && merged_rows < 
batch_size; offset++) {
+            bool cmp_res = _previous_row->impl && 
_previous_row->compare_two_rows(current->impl);
+            if (!cmp_res) {
+                // 1. dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: 
where rk < 3, need output all 1 and 2
+                // dense_rank() maybe need distinct rows of 
partition_inner_limit
+                // so check have output distinct rows, not _output_total_rows
+                // 2. rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk 
< 7, need output all 1,1,1,4,5,6,6,....6
+                // rank() maybe need check when have get a distinct row
+                // so when the cmp_res is get a distinct row, need check have 
output all rows num
+                if (_get_enough_data()) {
+                    return Status::OK();
                 }
-                *_previous_row = current;
+                *_previous_row = *current;
+                _output_distinct_rows++;
             }
             for (size_t i = 0; i < num_columns; ++i) {
-                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+                
merged_columns[i]->insert_from(*current->impl->block->get_columns()[i],
+                                               current->impl->pos);
+            }
+            merged_rows++;
+            _output_total_rows++;
+            if (!current->impl->is_last(1)) {
+                queue.next(1);
+            } else {
+                queue.remove_top();
             }
-            current_output_rows++;
-            break;
-        }
-        default:
-            break;
-        }
-
-        if (!current->is_last()) {
-            current->next();
-            priority_queue.push(current);
-        }
-
-        if (current_output_rows == batch_size || get_enough_data == true) {
-            break;
         }
     }
 
-    _output_total_rows += output_block->rows();
-    if (current_output_rows == 0 || get_enough_data == true) {
-        *eos = true;
-    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/common/sort/partition_sorter.h 
b/be/src/vec/common/sort/partition_sorter.h
index 0939dcd40cd..053b3aa1a29 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -90,17 +90,30 @@ public:
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
 
     size_t data_size() const override { return _state->data_size(); }
-
-    Status partition_sort_read(Block* block, bool* eos, int batch_size);
     int64 get_output_rows() const { return _output_total_rows; }
     void reset_sorter_state(RuntimeState* runtime_state);
 
 private:
+    Status _read_row_num(Block* block, bool* eos, int batch_size);
+    Status _read_row_rank(Block* block, bool* eos, int batch_size);
+    bool _get_enough_data() const {
+        if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) {
+            // dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk < 
3, need output all 1 and 2
+            // dense_rank() maybe need distinct rows of partition_inner_limit
+            // so check have output distinct rows, not _output_total_rows
+            return _output_distinct_rows >= _partition_inner_limit;
+        } else {
+            // rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7, 
need output all 1,1,1,4,5,6,6,....6
+            // rank() maybe need check when have get a distinct row
+            // so when the cmp_res is get a distinct row, need check have 
output all rows num
+            return _output_total_rows >= _partition_inner_limit;
+        }
+    }
+
     std::unique_ptr<MergeSorterState> _state;
     const RowDescriptor& _row_desc;
     int64 _output_total_rows = 0;
     int64 _output_distinct_rows = 0;
-    bool _has_global_limit = false;
     int _partition_inner_limit = 0;
     TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER;
     SortCursorCmp* _previous_row = nullptr;
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 4f7de1d379a..0fddcf01182 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -57,13 +57,11 @@ namespace doris::vectorized {
 //
 
 void MergeSorterState::reset() {
-    auto empty_queue = std::priority_queue<MergeSortCursor>();
-    priority_queue_.swap(empty_queue);
     std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
     std::vector<std::shared_ptr<Block>> empty_blocks(0);
-    sorted_blocks_.swap(empty_blocks);
-    unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
-    in_mem_sorted_bocks_size_ = 0;
+    _sorted_blocks.swap(empty_blocks);
+    unsorted_block() = Block::create_unique(unsorted_block()->clone_empty());
+    _in_mem_sorted_bocks_size = 0;
 }
 
 void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
@@ -71,72 +69,80 @@ void 
MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
     if (0 == rows) {
         return;
     }
-    in_mem_sorted_bocks_size_ += block->bytes();
-    sorted_blocks_.emplace_back(block);
-    num_rows_ += rows;
+    _in_mem_sorted_bocks_size += block->bytes();
+    _sorted_blocks.emplace_back(block);
+    _num_rows += rows;
 }
 
 Status MergeSorterState::build_merge_tree(const SortDescription& 
sort_description) {
-    for (auto& block : sorted_blocks_) {
-        priority_queue_.emplace(
+    std::vector<MergeSortCursor> cursors;
+    for (auto& block : _sorted_blocks) {
+        cursors.emplace_back(
                 MergeSortCursorImpl::create_shared(std::move(block), 
sort_description));
     }
+    _queue = MergeSorterQueue(cursors);
 
-    sorted_blocks_.clear();
+    _sorted_blocks.clear();
     return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int 
batch_size,
                                          bool* eos) {
-    DCHECK(sorted_blocks_.empty());
-    DCHECK(unsorted_block_->empty());
-    if (priority_queue_.empty()) {
-        *eos = true;
-    } else if (priority_queue_.size() == 1) {
-        if (offset_ != 0 || priority_queue_.top()->pos != 0) {
-            // Skip rows already returned or need to be ignored
-            int64_t offset = offset_ + (int64_t)priority_queue_.top()->pos;
-            priority_queue_.top().impl->block->skip_num_rows(offset);
-        }
-        block->swap(*priority_queue_.top().impl->block);
-        *eos = true;
-    } else {
-        RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
-    }
+    DCHECK(_sorted_blocks.empty());
+    DCHECK(unsorted_block()->empty());
+    RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
     return Status::OK();
 }
 
 Status MergeSorterState::_merge_sort_read_impl(int batch_size, 
doris::vectorized::Block* block,
                                                bool* eos) {
-    size_t num_columns = priority_queue_.top().impl->block->columns();
+    size_t num_columns = unsorted_block()->columns();
 
-    MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
-            block, *priority_queue_.top().impl->block);
+    MutableBlock m_block = 
VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block());
     MutableColumns& merged_columns = m_block.mutable_columns();
 
     /// Take rows from queue in right order and push to 'merged'.
     size_t merged_rows = 0;
     // process single element queue on merge_sort_read()
-    while (priority_queue_.size() > 1 && merged_rows < batch_size) {
-        auto current = priority_queue_.top();
-        priority_queue_.pop();
+    while (_queue.is_valid() && merged_rows < batch_size) {
+        auto [current, current_rows] = _queue.current();
+        current_rows = std::min(current_rows, batch_size - merged_rows);
+
+        size_t step = std::min(_offset, current_rows);
+        _offset -= step;
+        current_rows -= step;
+
+        if (current->impl->is_last(current_rows + step) && current->impl->pos 
== 0 && step == 0) {
+            if (merged_rows != 0) {
+                // return directly for next time's read swap whole block
+                return Status::OK();
+            }
+            // swap and return block directly when we should get all data from 
cursor
+            block->swap(*current->impl->block);
+            _queue.remove_top();
+            return Status::OK();
+        }
 
-        if (offset_ == 0) {
+        if (current_rows) {
             for (size_t i = 0; i < num_columns; ++i) {
-                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+                
merged_columns[i]->insert_range_from(*current->impl->columns[i],
+                                                     current->impl->pos + 
step, current_rows);
             }
-            ++merged_rows;
-        } else {
-            offset_--;
+            merged_rows += current_rows;
         }
 
-        if (!current->is_last()) {
-            current->next();
-            priority_queue_.push(current);
+        if (!current->impl->is_last(current_rows + step)) {
+            _queue.next(current_rows + step);
+        } else {
+            _queue.remove_top();
         }
     }
 
     block->set_columns(std::move(merged_columns));
+
+    if (merged_rows == 0) {
+        *eos = true;
+    }
     return Status::OK();
 }
 
@@ -207,23 +213,28 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, 
int limit, int64_t offs
 Status FullSorter::append_block(Block* block) {
     DCHECK(block->rows() > 0);
 
-    if (_reach_limit() && block->bytes() > 
_state->unsorted_block_->allocated_bytes() -
-                                                   
_state->unsorted_block_->bytes()) {
+    if (_reach_limit() && block->bytes() > 
_state->unsorted_block()->allocated_bytes() -
+                                                   
_state->unsorted_block()->bytes()) {
         RETURN_IF_ERROR(_do_sort());
     }
 
     {
         SCOPED_TIMER(_merge_block_timer);
-        const auto& data = 
_state->unsorted_block_->get_columns_with_type_and_name();
+        const auto& data = 
_state->unsorted_block()->get_columns_with_type_and_name();
         const auto& arrival_data = block->get_columns_with_type_and_name();
         auto sz = block->rows();
         for (int i = 0; i < data.size(); ++i) {
             DCHECK(data[i].type->equals(*(arrival_data[i].type)))
                     << " type1: " << data[i].type->get_name()
                     << " type2: " << arrival_data[i].type->get_name() << " i: 
" << i;
-            //TODO: to eliminate unnecessary expansion, we need a 
`insert_range_from_const` for every column type.
-            data[i].column->assume_mutable()->insert_range_from(
-                    
*arrival_data[i].column->convert_to_full_column_if_const(), 0, sz);
+            if (is_column_const(*arrival_data[i].column)) {
+                data[i].column->assume_mutable()->insert_many_from(
+                        assert_cast<const 
ColumnConst*>(arrival_data[i].column.get())
+                                ->get_data_column(),
+                        0, sz);
+            } else {
+                
data[i].column->assume_mutable()->insert_range_from(*arrival_data[i].column, 0, 
sz);
+            }
         }
         block->clear_column_data();
     }
@@ -231,7 +242,7 @@ Status FullSorter::append_block(Block* block) {
 }
 
 Status FullSorter::prepare_for_read() {
-    if (_state->unsorted_block_->rows() > 0) {
+    if (_state->unsorted_block()->rows() > 0) {
         RETURN_IF_ERROR(_do_sort());
     }
     return _state->build_merge_tree(_sort_description);
@@ -247,7 +258,7 @@ Status FullSorter::merge_sort_read_for_spill(RuntimeState* 
state, doris::vectori
 }
 
 Status FullSorter::_do_sort() {
-    Block* src_block = _state->unsorted_block_.get();
+    Block* src_block = _state->unsorted_block().get();
     Block desc_block = src_block->clone_without_columns();
     RETURN_IF_ERROR(partial_sort(*src_block, desc_block));
 
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 36c535c9101..69e70ab869c 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -43,6 +43,8 @@ class RowDescriptor;
 
 namespace doris::vectorized {
 
+using MergeSorterQueue = SortingQueueBatch<MergeSortCursor>;
+
 // TODO: now we only use merge sort
 class MergeSorterState {
     ENABLE_FACTORY_CREATOR(MergeSorterState);
@@ -53,9 +55,9 @@ public:
             // create_empty_block should ignore invalid slots, unsorted_block
             // should be same structure with arrival block from child node
             // since block from child node may ignored these slots
-            : unsorted_block_(Block::create_unique(
+            : _unsorted_block(Block::create_unique(
                       VectorizedUtils::create_empty_block(row_desc, true 
/*ignore invalid slot*/))),
-              offset_(offset) {}
+              _offset(offset) {}
 
     ~MergeSorterState() = default;
 
@@ -66,32 +68,33 @@ public:
     Status merge_sort_read(doris::vectorized::Block* block, int batch_size, 
bool* eos);
 
     size_t data_size() const {
-        size_t size = unsorted_block_->bytes();
-        return size + in_mem_sorted_bocks_size_;
+        size_t size = _unsorted_block->bytes();
+        return size + _in_mem_sorted_bocks_size;
     }
 
-    uint64_t num_rows() const { return num_rows_; }
+    uint64_t num_rows() const { return _num_rows; }
 
-    std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); 
}
+    std::shared_ptr<Block> last_sorted_block() { return _sorted_blocks.back(); 
}
 
-    std::vector<std::shared_ptr<Block>>& get_sorted_block() { return 
sorted_blocks_; }
-    std::priority_queue<MergeSortCursor>& get_priority_queue() { return 
priority_queue_; }
+    std::vector<std::shared_ptr<Block>>& get_sorted_block() { return 
_sorted_blocks; }
+    MergeSorterQueue& get_queue() { return _queue; }
     void reset();
 
-    std::unique_ptr<Block> unsorted_block_;
+    std::unique_ptr<Block>& unsorted_block() { return _unsorted_block; }
 
 private:
     Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* 
block, bool* eos);
 
-    std::priority_queue<MergeSortCursor> priority_queue_;
-    std::vector<std::shared_ptr<Block>> sorted_blocks_;
-    size_t in_mem_sorted_bocks_size_ = 0;
-    uint64_t num_rows_ = 0;
+    std::unique_ptr<Block> _unsorted_block;
+    MergeSorterQueue _queue;
+    std::vector<std::shared_ptr<Block>> _sorted_blocks;
+    size_t _in_mem_sorted_bocks_size = 0;
+    uint64_t _num_rows = 0;
 
-    int64_t offset_;
+    size_t _offset;
 
-    Block merge_sorted_block_;
-    std::unique_ptr<VSortedRunMerger> merger_;
+    Block _merge_sorted_block;
+    std::unique_ptr<VSortedRunMerger> _merger;
 };
 
 class Sorter {
@@ -177,7 +180,7 @@ public:
 
 private:
     bool _reach_limit() {
-        return _state->unsorted_block_->allocated_bytes() >= 
buffered_block_bytes_;
+        return _state->unsorted_block()->allocated_bytes() >= 
buffered_block_bytes_;
     }
 
     Status _do_sort();
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index d31767f46e4..a37b6feb21e 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -20,6 +20,8 @@
 
 #pragma once
 
+#include <utility>
+
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
 #include "vec/core/sort_description.h"
@@ -48,8 +50,7 @@ private:
     void _reset() {
         sort_columns.clear();
         auto columns = block.get_columns_and_convert();
-        for (size_t j = 0, size = desc.size(); j < size; ++j) {
-            auto& column_desc = desc[j];
+        for (auto& column_desc : desc) {
             size_t column_number = !column_desc.column_name.empty()
                                            ? 
block.get_position_by_name(column_desc.column_name)
                                            : column_desc.column_number;
@@ -63,7 +64,7 @@ using HeapSortCursorBlockSPtr = 
std::shared_ptr<HeapSortCursorBlockView>;
 struct HeapSortCursorImpl {
 public:
     HeapSortCursorImpl(int row_id, HeapSortCursorBlockSPtr block_view)
-            : _row_id(row_id), _block_view(block_view) {}
+            : _row_id(row_id), _block_view(std::move(block_view)) {}
 
     HeapSortCursorImpl(const HeapSortCursorImpl& other) {
         _row_id = other._row_id;
@@ -123,6 +124,7 @@ struct MergeSortCursorImpl {
     ENABLE_FACTORY_CREATOR(MergeSortCursorImpl);
     std::shared_ptr<Block> block;
     ColumnRawPtrs sort_columns;
+    ColumnRawPtrs columns;
     SortDescription desc;
     size_t sort_columns_size = 0;
     size_t pos = 0;
@@ -131,26 +133,33 @@ struct MergeSortCursorImpl {
     MergeSortCursorImpl() = default;
     virtual ~MergeSortCursorImpl() = default;
 
-    MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription& 
desc_)
-            : block(block_), desc(desc_), sort_columns_size(desc.size()) {
+    MergeSortCursorImpl(std::shared_ptr<Block> block_, SortDescription desc_)
+            : block(std::move(block_)), desc(std::move(desc_)), 
sort_columns_size(desc.size()) {
         reset();
     }
 
-    MergeSortCursorImpl(const SortDescription& desc_)
-            : block(Block::create_shared()), desc(desc_), 
sort_columns_size(desc.size()) {}
+    MergeSortCursorImpl(SortDescription desc_)
+            : block(Block::create_shared()),
+              desc(std::move(desc_)),
+              sort_columns_size(desc.size()) {}
+
     bool empty() const { return rows == 0; }
 
     /// Set the cursor to the beginning of the new block.
     void reset() {
         sort_columns.clear();
+        columns.clear();
 
-        auto columns = block->get_columns_and_convert();
-        for (size_t j = 0, size = desc.size(); j < size; ++j) {
-            auto& column_desc = desc[j];
+        auto tmp_columns = block->get_columns_and_convert();
+        columns.reserve(tmp_columns.size());
+        for (auto col : tmp_columns) {
+            columns.push_back(col.get());
+        }
+        for (auto& column_desc : desc) {
             size_t column_number = !column_desc.column_name.empty()
                                            ? 
block->get_position_by_name(column_desc.column_name)
                                            : column_desc.column_number;
-            sort_columns.push_back(columns[column_number].get());
+            sort_columns.push_back(columns[column_number]);
         }
 
         pos = 0;
@@ -158,8 +167,9 @@ struct MergeSortCursorImpl {
     }
 
     bool is_first() const { return pos == 0; }
-    bool is_last() const { return pos + 1 >= rows; }
-    void next() { ++pos; }
+    bool is_last(size_t size = 1) const { return pos + size >= rows; }
+    void next(size_t size = 1) { pos += size; }
+    size_t get_size() const { return rows; }
 
     virtual bool has_next_block() { return false; }
     virtual Block* block_ptr() { return nullptr; }
@@ -169,11 +179,11 @@ using BlockSupplier = std::function<Status(Block*, bool* 
eos)>;
 
 struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
     ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
-    BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
+    BlockSupplierSortCursorImpl(BlockSupplier block_supplier,
                                 const VExprContextSPtrs& ordering_expr,
                                 const std::vector<bool>& is_asc_order,
                                 const std::vector<bool>& nulls_first)
-            : _ordering_expr(ordering_expr), _block_supplier(block_supplier) {
+            : _ordering_expr(ordering_expr), 
_block_supplier(std::move(block_supplier)) {
         block = Block::create_shared();
         sort_columns_size = ordering_expr.size();
 
@@ -185,8 +195,8 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
         _is_eof = !has_next_block();
     }
 
-    BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const 
SortDescription& desc_)
-            : MergeSortCursorImpl(desc_), _block_supplier(block_supplier) {
+    BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const 
SortDescription& desc_)
+            : MergeSortCursorImpl(desc_), 
_block_supplier(std::move(block_supplier)) {
         _is_eof = !has_next_block();
     }
 
@@ -202,7 +212,7 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
         // If status not ok, upper callers could not detect whether it is eof 
or error.
         // So that fatal here, and should throw exception in the future.
         if (status.ok() && !block->empty()) {
-            if (_ordering_expr.size() > 0) {
+            if (!_ordering_expr.empty()) {
                 for (int i = 0; status.ok() && i < desc.size(); ++i) {
                     // TODO yiguolei: throw exception if status not ok in the 
future
                     status = _ordering_expr[i]->execute(block.get(), 
&desc[i].column_number);
@@ -233,7 +243,7 @@ struct MergeSortCursor {
     ENABLE_FACTORY_CREATOR(MergeSortCursor);
     std::shared_ptr<MergeSortCursorImpl> impl;
 
-    MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_) 
{}
+    MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : 
impl(std::move(impl_)) {}
     MergeSortCursorImpl* operator->() const { return impl.get(); }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
@@ -264,6 +274,21 @@ struct MergeSortCursor {
         return greater_at(rhs, impl->rows - 1, 0) == -1;
     }
 
+    /// Checks that all rows in the current block of this cursor are less than 
or equal to all the rows of the current block of another cursor.
+    bool totally_less_or_equals(const MergeSortCursor& rhs) const {
+        if (impl->rows == 0 || rhs.impl->rows == 0) {
+            return false;
+        }
+
+        /// The last row of this cursor is no larger than the first row of the 
another cursor.
+        return greater_at(rhs, impl->rows - 1, rhs->pos) <= 0;
+    }
+
+    bool greater_with_offset(const MergeSortCursor& rhs, size_t lhs_offset,
+                             size_t rhs_offset) const {
+        return greater_at(rhs, impl->pos + lhs_offset, rhs.impl->pos + 
rhs_offset) > 0;
+    }
+
     bool greater(const MergeSortCursor& rhs) const {
         return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0;
     }
@@ -277,7 +302,7 @@ struct MergeSortBlockCursor {
     ENABLE_FACTORY_CREATOR(MergeSortBlockCursor);
     std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
 
-    MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : 
impl(impl_) {}
+    MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : 
impl(std::move(impl_)) {}
     MergeSortCursorImpl* operator->() const { return impl.get(); }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
@@ -314,4 +339,219 @@ struct MergeSortBlockCursor {
     }
 };
 
+enum class SortingQueueStrategy : uint8_t { Default, Batch };
+
+/// Allows to fetch data from multiple sort cursors in sorted order (merging 
sorted data streams).
+template <typename Cursor, SortingQueueStrategy strategy>
+class SortingQueueImpl {
+public:
+    SortingQueueImpl() = default;
+
+    template <typename Cursors>
+    explicit SortingQueueImpl(Cursors& cursors) {
+        size_t size = cursors.size();
+        _queue.reserve(size);
+
+        for (size_t i = 0; i < size; ++i) {
+            _queue.emplace_back(cursors[i]);
+        }
+
+        std::make_heap(_queue.begin(), _queue.end());
+
+        if constexpr (strategy == SortingQueueStrategy::Batch) {
+            if (!_queue.empty()) {
+                update_batch_size();
+            }
+        }
+    }
+
+    bool is_valid() const { return !_queue.empty(); }
+
+    Cursor& current()
+        requires(strategy == SortingQueueStrategy::Default)
+    {
+        return &_queue.front();
+    }
+
+    std::pair<Cursor*, size_t> current()
+        requires(strategy == SortingQueueStrategy::Batch)
+    {
+        return {&_queue.front(), batch_size};
+    }
+
+    size_t size() { return _queue.size(); }
+
+    Cursor& next_child() { return _queue[next_child_index()]; }
+
+    void ALWAYS_INLINE next()
+        requires(strategy == SortingQueueStrategy::Default)
+    {
+        assert(is_valid());
+
+        if (!_queue.front()->is_last()) {
+            _queue.front()->next();
+            update_top(true);
+        } else {
+            remove_top();
+        }
+    }
+
+    void ALWAYS_INLINE next(size_t batch_size_value)
+        requires(strategy == SortingQueueStrategy::Batch)
+    {
+        assert(is_valid());
+        assert(batch_size_value <= batch_size);
+        assert(batch_size_value > 0);
+
+        batch_size -= batch_size_value;
+        if (batch_size > 0) {
+            _queue.front()->next(batch_size_value);
+            return;
+        }
+
+        if (!_queue.front()->is_last(batch_size_value)) {
+            _queue.front()->next(batch_size_value);
+            update_top(false);
+        } else {
+            remove_top();
+        }
+    }
+
+    void remove_top() {
+        std::pop_heap(_queue.begin(), _queue.end());
+        _queue.pop_back();
+        next_child_idx = 0;
+
+        if constexpr (strategy == SortingQueueStrategy::Batch) {
+            if (_queue.empty()) {
+                batch_size = 0;
+            } else {
+                update_batch_size();
+            }
+        }
+    }
+
+    void push(MergeSortCursorImpl& cursor) {
+        _queue.emplace_back(&cursor);
+        std::push_heap(_queue.begin(), _queue.end());
+        next_child_idx = 0;
+
+        if constexpr (strategy == SortingQueueStrategy::Batch) {
+            update_batch_size();
+        }
+    }
+
+private:
+    using Container = std::vector<Cursor>;
+    Container _queue;
+
+    /// Cache comparison between first and second child if the order in queue 
has not been changed.
+    size_t next_child_idx = 0;
+    size_t batch_size = 0;
+
+    size_t ALWAYS_INLINE next_child_index() {
+        if (next_child_idx == 0) {
+            next_child_idx = 1;
+
+            if (_queue.size() > 2 && _queue[1].greater(_queue[2])) {
+                ++next_child_idx;
+            }
+        }
+
+        return next_child_idx;
+    }
+
+    /// This is adapted version of the function __sift_down from libc++.
+    /// Why cannot simply use std::priority_queue?
+    /// - because it doesn't support updating the top element and requires pop 
and push instead.
+    /// Also look at "Boost.Heap" library.
+    void ALWAYS_INLINE update_top(bool check_in_order) {
+        size_t size = _queue.size();
+        if (size < 2) {
+            return;
+        }
+
+        auto begin = _queue.begin();
+
+        size_t child_idx = next_child_index();
+        auto child_it = begin + child_idx;
+
+        /// Check if we are in order.
+        if (check_in_order && (*child_it).greater(*begin)) {
+            if constexpr (strategy == SortingQueueStrategy::Batch) {
+                update_batch_size();
+            }
+            return;
+        }
+
+        next_child_idx = 0;
+
+        auto curr_it = begin;
+        auto top(std::move(*begin));
+        do {
+            /// We are not in heap-order, swap the parent with it's largest 
child.
+            *curr_it = std::move(*child_it);
+            curr_it = child_it;
+
+            // recompute the child based off of the updated parent
+            child_idx = 2 * child_idx + 1;
+
+            if (child_idx >= size) {
+                break;
+            }
+
+            child_it = begin + child_idx;
+
+            if ((child_idx + 1) < size && (*child_it).greater(*(child_it + 
1))) {
+                /// Right child exists and is greater than left child.
+                ++child_it;
+                ++child_idx;
+            }
+
+            /// Check if we are in order.
+        } while (!((*child_it).greater(top)));
+        *curr_it = std::move(top);
+
+        if constexpr (strategy == SortingQueueStrategy::Batch) {
+            update_batch_size();
+        }
+    }
+
+    /// Update batch size of elements that client can extract from current 
cursor
+    void update_batch_size() {
+        DCHECK(!_queue.empty());
+
+        auto& begin_cursor = *_queue.begin();
+        size_t min_cursor_size = begin_cursor->get_size();
+        size_t min_cursor_pos = begin_cursor->pos;
+
+        if (_queue.size() == 1) {
+            batch_size = min_cursor_size - min_cursor_pos;
+            return;
+        }
+
+        batch_size = 1;
+        size_t child_idx = next_child_index();
+        auto& next_child_cursor = *(_queue.begin() + child_idx);
+        if (min_cursor_pos + batch_size < min_cursor_size &&
+            next_child_cursor.greater_with_offset(begin_cursor, 0, 
batch_size)) {
+            ++batch_size;
+        } else {
+            return;
+        }
+        if (begin_cursor.totally_less_or_equals(next_child_cursor)) {
+            batch_size = min_cursor_size - min_cursor_pos;
+            return;
+        }
+
+        while (min_cursor_pos + batch_size < min_cursor_size &&
+               next_child_cursor.greater_with_offset(begin_cursor, 0, 
batch_size)) {
+            ++batch_size;
+        }
+    }
+};
+template <typename Cursor>
+using SortingQueue = SortingQueueImpl<Cursor, SortingQueueStrategy::Default>;
+template <typename Cursor>
+using SortingQueueBatch = SortingQueueImpl<Cursor, 
SortingQueueStrategy::Batch>;
 } // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
index 20142e380ce..69a1b871d1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
@@ -142,7 +142,10 @@ public class PartitionSortNode extends PlanNode {
         Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for 
tupleIds in PartitionSortNode");
 
         TopNAlgorithm topNAlgorithm;
-        if (function == WindowFuncType.ROW_NUMBER) {
+        if (hasGlobalLimit) {
+            // only need row number if has global limit, so we change 
algorithm directly
+            topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
+        } else if (function == WindowFuncType.ROW_NUMBER) {
             topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
         } else if (function == WindowFuncType.RANK) {
             topNAlgorithm = TopNAlgorithm.RANK;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to