This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ac0322dc746 [Enchancement](sort) change priority_queue to ck
SortingQueue (#45952)
ac0322dc746 is described below
commit ac0322dc7461df147367a20ed6499802bcd5c387
Author: Pxl <[email protected]>
AuthorDate: Tue Jan 7 14:56:09 2025 +0800
[Enchancement](sort) change priority_queue to ck SortingQueue (#45952)
1. change priority_queue to ck SortingQueue(a heap whitch support modify
top element)
2. avoid some convert_if_const usage

---
be/src/vec/columns/column.h | 1 -
be/src/vec/common/pod_array.h | 8 +-
be/src/vec/common/sort/partition_sorter.cpp | 197 +++++++--------
be/src/vec/common/sort/partition_sorter.h | 19 +-
be/src/vec/common/sort/sorter.cpp | 107 ++++----
be/src/vec/common/sort/sorter.h | 37 +--
be/src/vec/core/sort_cursor.h | 280 +++++++++++++++++++--
.../apache/doris/planner/PartitionSortNode.java | 5 +-
8 files changed, 460 insertions(+), 194 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 729e5470c97..bc8ab101ecf 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -196,7 +196,6 @@ public:
/// Appends range of elements from other column with the same type.
/// Could be used to concatenate columns.
- /// TODO: we need `insert_range_from_const` for every column type.
virtual void insert_range_from(const IColumn& src, size_t start, size_t
length) = 0;
/// Appends range of elements from other column with the same type.
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index f798ca69bd6..c55dc0cc33e 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -355,14 +355,14 @@ public:
/// The index is signed to access -1th element without pointer overflow.
T& operator[](ssize_t n) {
/// <= size, because taking address of one element past memory range
is Ok in C++ (expression like &arr[arr.size()] is perfectly valid).
- assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) &&
- (n <= static_cast<ssize_t>(this->size())));
+ DCHECK_GE(n, (static_cast<ssize_t>(pad_left_) ? -1 : 0));
+ DCHECK_LE(n, static_cast<ssize_t>(this->size()));
return t_start()[n];
}
const T& operator[](ssize_t n) const {
- assert((n >= (static_cast<ssize_t>(pad_left_) ? -1 : 0)) &&
- (n <= static_cast<ssize_t>(this->size())));
+ DCHECK_GE(n, (static_cast<ssize_t>(pad_left_) ? -1 : 0));
+ DCHECK_LE(n, static_cast<ssize_t>(this->size()));
return t_start()[n];
}
diff --git a/be/src/vec/common/sort/partition_sorter.cpp
b/be/src/vec/common/sort/partition_sorter.cpp
index c47a8a88952..f6a3cb443aa 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -49,9 +49,11 @@ PartitionSorter::PartitionSorter(VSortExecExprs&
vsort_exec_exprs, int limit, in
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
_state(MergeSorterState::create_unique(row_desc, offset, limit,
state, profile)),
_row_desc(row_desc),
- _has_global_limit(has_global_limit),
_partition_inner_limit(partition_inner_limit),
- _top_n_algorithm(top_n_algorithm),
+ _top_n_algorithm(
+ has_global_limit
+ ? TopNAlgorithm::ROW_NUMBER
+ : top_n_algorithm), // FE will make this
modification, but still maintain this code for compatibility
_previous_row(previous_row) {}
Status PartitionSorter::append_block(Block* input_block) {
@@ -64,10 +66,13 @@ Status PartitionSorter::append_block(Block* input_block) {
Status PartitionSorter::prepare_for_read() {
auto& blocks = _state->get_sorted_block();
- auto& priority_queue = _state->get_priority_queue();
+ auto& queue = _state->get_queue();
+ std::vector<MergeSortCursor> cursors;
for (auto& block : blocks) {
- priority_queue.emplace(MergeSortCursorImpl::create_shared(block,
_sort_description));
+ cursors.emplace_back(
+ MergeSortCursorImpl::create_shared(std::move(block),
_sort_description));
}
+ queue = MergeSorterQueue(cursors);
blocks.clear();
return Status::OK();
}
@@ -88,122 +93,114 @@ void PartitionSorter::reset_sorter_state(RuntimeState*
runtime_state) {
}
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos)
{
- if (_state->get_priority_queue().empty()) {
- *eos = true;
- } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
- block->swap(*_state->get_priority_queue().top().impl->block);
- block->set_num_rows(_partition_inner_limit);
- *eos = true;
+ if (_top_n_algorithm == TopNAlgorithm::ROW_NUMBER) {
+ return _read_row_num(block, eos, state->batch_size());
} else {
- RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
+ return _read_row_rank(block, eos, state->batch_size());
}
- return Status::OK();
}
-Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos,
int batch_size) {
- auto& priority_queue = _state->get_priority_queue();
- const auto& sorted_block = priority_queue.top().impl->block;
- size_t num_columns = sorted_block->columns();
+Status PartitionSorter::_read_row_num(Block* output_block, bool* eos, int
batch_size) {
+ auto& queue = _state->get_queue();
+ size_t num_columns = _state->unsorted_block()->columns();
+
MutableBlock m_block =
- VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*sorted_block);
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_state->unsorted_block());
MutableColumns& merged_columns = m_block.mutable_columns();
- size_t current_output_rows = 0;
-
- bool get_enough_data = false;
- while (!priority_queue.empty()) {
- auto current = priority_queue.top();
- priority_queue.pop();
- if (UNLIKELY(_previous_row->impl == nullptr)) {
- *_previous_row = current;
+ size_t merged_rows = 0;
+
+ Defer defer {[&]() {
+ if (merged_rows == 0 || _get_enough_data()) {
+ *eos = true;
}
+ }};
- switch (_top_n_algorithm) {
- case TopNAlgorithm::ROW_NUMBER: {
- //1 row_number no need to check distinct, just output
partition_inner_limit row
- if ((current_output_rows + _output_total_rows) <
_partition_inner_limit) {
- for (size_t i = 0; i < num_columns; ++i) {
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
- }
- } else {
- //rows has get enough
- get_enough_data = true;
+ while (queue.is_valid() && merged_rows < batch_size &&
!_get_enough_data()) {
+ auto [current, current_rows] = queue.current();
+
+ // row_number no need to check distinct, just output
partition_inner_limit row
+ size_t needed_rows = _partition_inner_limit - _output_total_rows;
+ size_t step = std::min(needed_rows, std::min(current_rows, batch_size
- merged_rows));
+
+ if (current->impl->is_last(step) && current->impl->pos == 0) {
+ if (merged_rows != 0) {
+ // return directly for next time's read swap whole block
+ return Status::OK();
}
- current_output_rows++;
- break;
+ // swap and return block directly when we should get all data from
cursor
+ output_block->swap(*current->impl->block);
+ merged_rows += step;
+ _output_total_rows += step;
+ queue.remove_top();
+ return Status::OK();
}
- case TopNAlgorithm::DENSE_RANK: {
- // dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk
< 3, need output all 1 and 2
- //3 dense_rank() maybe need distinct rows of partition_inner_limit
- //3.1 _has_global_limit = true, so check (current_output_rows +
_output_total_rows) >= _partition_inner_limit)
- //3.2 _has_global_limit = false. so check have output distinct
rows, not _output_total_rows
- if (_has_global_limit &&
- (current_output_rows + _output_total_rows) >=
_partition_inner_limit) {
- get_enough_data = true;
- break;
- }
- if (_has_global_limit) {
- current_output_rows++;
- } else {
- bool cmp_res = _previous_row->compare_two_rows(current);
- //get a distinct row
- if (cmp_res == false) {
- _output_distinct_rows++; //need rows++ firstly
- if (_output_distinct_rows >= _partition_inner_limit) {
- get_enough_data = true;
- break;
- }
- *_previous_row = current;
- }
- }
+
+ if (step) {
+ merged_rows += step;
+ _output_total_rows += step;
for (size_t i = 0; i < num_columns; ++i) {
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+
merged_columns[i]->insert_range_from(*current->impl->columns[i],
current->impl->pos,
+ step);
}
- break;
}
- case TopNAlgorithm::RANK: {
- // rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7,
need output all 1,1,1,4,5,6,6,....6
- //2 rank() maybe need check when have get a distinct row
- //2.1 _has_global_limit = true: (current_output_rows +
_output_total_rows) >= _partition_inner_limit)
- //2.2 _has_global_limit = false: so when the cmp_res is get a
distinct row, need check have output all rows num
- if (_has_global_limit &&
- (current_output_rows + _output_total_rows) >=
_partition_inner_limit) {
- get_enough_data = true;
- break;
- }
- bool cmp_res = _previous_row->compare_two_rows(current);
- //get a distinct row
- if (cmp_res == false) {
- //here must be check distinct of two rows, and then check nums
of row
- if ((current_output_rows + _output_total_rows) >=
_partition_inner_limit) {
- get_enough_data = true;
- break;
+
+ if (!current->impl->is_last(step)) {
+ queue.next(step);
+ } else {
+ queue.remove_top();
+ }
+ }
+
+ return Status::OK();
+}
+
+Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int
batch_size) {
+ auto& queue = _state->get_queue();
+ size_t num_columns = _state->unsorted_block()->columns();
+
+ MutableBlock m_block =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_state->unsorted_block());
+ MutableColumns& merged_columns = m_block.mutable_columns();
+ size_t merged_rows = 0;
+
+ Defer defer {[&]() {
+ if (merged_rows == 0 || _get_enough_data()) {
+ *eos = true;
+ }
+ }};
+
+ while (queue.is_valid() && merged_rows < batch_size) {
+ auto [current, current_rows] = queue.current();
+
+ for (size_t offset = 0; offset < current_rows && merged_rows <
batch_size; offset++) {
+ bool cmp_res = _previous_row->impl &&
_previous_row->compare_two_rows(current->impl);
+ if (!cmp_res) {
+ // 1. dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL:
where rk < 3, need output all 1 and 2
+ // dense_rank() maybe need distinct rows of
partition_inner_limit
+ // so check have output distinct rows, not _output_total_rows
+ // 2. rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk
< 7, need output all 1,1,1,4,5,6,6,....6
+ // rank() maybe need check when have get a distinct row
+ // so when the cmp_res is get a distinct row, need check have
output all rows num
+ if (_get_enough_data()) {
+ return Status::OK();
}
- *_previous_row = current;
+ *_previous_row = *current;
+ _output_distinct_rows++;
}
for (size_t i = 0; i < num_columns; ++i) {
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+
merged_columns[i]->insert_from(*current->impl->block->get_columns()[i],
+ current->impl->pos);
+ }
+ merged_rows++;
+ _output_total_rows++;
+ if (!current->impl->is_last(1)) {
+ queue.next(1);
+ } else {
+ queue.remove_top();
}
- current_output_rows++;
- break;
- }
- default:
- break;
- }
-
- if (!current->is_last()) {
- current->next();
- priority_queue.push(current);
- }
-
- if (current_output_rows == batch_size || get_enough_data == true) {
- break;
}
}
- _output_total_rows += output_block->rows();
- if (current_output_rows == 0 || get_enough_data == true) {
- *eos = true;
- }
return Status::OK();
}
diff --git a/be/src/vec/common/sort/partition_sorter.h
b/be/src/vec/common/sort/partition_sorter.h
index 0939dcd40cd..053b3aa1a29 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -90,17 +90,30 @@ public:
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
size_t data_size() const override { return _state->data_size(); }
-
- Status partition_sort_read(Block* block, bool* eos, int batch_size);
int64 get_output_rows() const { return _output_total_rows; }
void reset_sorter_state(RuntimeState* runtime_state);
private:
+ Status _read_row_num(Block* block, bool* eos, int batch_size);
+ Status _read_row_rank(Block* block, bool* eos, int batch_size);
+ bool _get_enough_data() const {
+ if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) {
+ // dense_rank(): 1,1,1,2,2,2,2,.......,2,3,3,3, if SQL: where rk <
3, need output all 1 and 2
+ // dense_rank() maybe need distinct rows of partition_inner_limit
+ // so check have output distinct rows, not _output_total_rows
+ return _output_distinct_rows >= _partition_inner_limit;
+ } else {
+ // rank(): 1,1,1,4,5,6,6,6.....,6,100,101. if SQL where rk < 7,
need output all 1,1,1,4,5,6,6,....6
+ // rank() maybe need check when have get a distinct row
+ // so when the cmp_res is get a distinct row, need check have
output all rows num
+ return _output_total_rows >= _partition_inner_limit;
+ }
+ }
+
std::unique_ptr<MergeSorterState> _state;
const RowDescriptor& _row_desc;
int64 _output_total_rows = 0;
int64 _output_distinct_rows = 0;
- bool _has_global_limit = false;
int _partition_inner_limit = 0;
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER;
SortCursorCmp* _previous_row = nullptr;
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 4f7de1d379a..0fddcf01182 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -57,13 +57,11 @@ namespace doris::vectorized {
//
void MergeSorterState::reset() {
- auto empty_queue = std::priority_queue<MergeSortCursor>();
- priority_queue_.swap(empty_queue);
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
std::vector<std::shared_ptr<Block>> empty_blocks(0);
- sorted_blocks_.swap(empty_blocks);
- unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
- in_mem_sorted_bocks_size_ = 0;
+ _sorted_blocks.swap(empty_blocks);
+ unsorted_block() = Block::create_unique(unsorted_block()->clone_empty());
+ _in_mem_sorted_bocks_size = 0;
}
void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
@@ -71,72 +69,80 @@ void
MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
if (0 == rows) {
return;
}
- in_mem_sorted_bocks_size_ += block->bytes();
- sorted_blocks_.emplace_back(block);
- num_rows_ += rows;
+ _in_mem_sorted_bocks_size += block->bytes();
+ _sorted_blocks.emplace_back(block);
+ _num_rows += rows;
}
Status MergeSorterState::build_merge_tree(const SortDescription&
sort_description) {
- for (auto& block : sorted_blocks_) {
- priority_queue_.emplace(
+ std::vector<MergeSortCursor> cursors;
+ for (auto& block : _sorted_blocks) {
+ cursors.emplace_back(
MergeSortCursorImpl::create_shared(std::move(block),
sort_description));
}
+ _queue = MergeSorterQueue(cursors);
- sorted_blocks_.clear();
+ _sorted_blocks.clear();
return Status::OK();
}
Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int
batch_size,
bool* eos) {
- DCHECK(sorted_blocks_.empty());
- DCHECK(unsorted_block_->empty());
- if (priority_queue_.empty()) {
- *eos = true;
- } else if (priority_queue_.size() == 1) {
- if (offset_ != 0 || priority_queue_.top()->pos != 0) {
- // Skip rows already returned or need to be ignored
- int64_t offset = offset_ + (int64_t)priority_queue_.top()->pos;
- priority_queue_.top().impl->block->skip_num_rows(offset);
- }
- block->swap(*priority_queue_.top().impl->block);
- *eos = true;
- } else {
- RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
- }
+ DCHECK(_sorted_blocks.empty());
+ DCHECK(unsorted_block()->empty());
+ RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
return Status::OK();
}
Status MergeSorterState::_merge_sort_read_impl(int batch_size,
doris::vectorized::Block* block,
bool* eos) {
- size_t num_columns = priority_queue_.top().impl->block->columns();
+ size_t num_columns = unsorted_block()->columns();
- MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
- block, *priority_queue_.top().impl->block);
+ MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block());
MutableColumns& merged_columns = m_block.mutable_columns();
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
// process single element queue on merge_sort_read()
- while (priority_queue_.size() > 1 && merged_rows < batch_size) {
- auto current = priority_queue_.top();
- priority_queue_.pop();
+ while (_queue.is_valid() && merged_rows < batch_size) {
+ auto [current, current_rows] = _queue.current();
+ current_rows = std::min(current_rows, batch_size - merged_rows);
+
+ size_t step = std::min(_offset, current_rows);
+ _offset -= step;
+ current_rows -= step;
+
+ if (current->impl->is_last(current_rows + step) && current->impl->pos
== 0 && step == 0) {
+ if (merged_rows != 0) {
+ // return directly for next time's read swap whole block
+ return Status::OK();
+ }
+ // swap and return block directly when we should get all data from
cursor
+ block->swap(*current->impl->block);
+ _queue.remove_top();
+ return Status::OK();
+ }
- if (offset_ == 0) {
+ if (current_rows) {
for (size_t i = 0; i < num_columns; ++i) {
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+
merged_columns[i]->insert_range_from(*current->impl->columns[i],
+ current->impl->pos +
step, current_rows);
}
- ++merged_rows;
- } else {
- offset_--;
+ merged_rows += current_rows;
}
- if (!current->is_last()) {
- current->next();
- priority_queue_.push(current);
+ if (!current->impl->is_last(current_rows + step)) {
+ _queue.next(current_rows + step);
+ } else {
+ _queue.remove_top();
}
}
block->set_columns(std::move(merged_columns));
+
+ if (merged_rows == 0) {
+ *eos = true;
+ }
return Status::OK();
}
@@ -207,23 +213,28 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs,
int limit, int64_t offs
Status FullSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
- if (_reach_limit() && block->bytes() >
_state->unsorted_block_->allocated_bytes() -
-
_state->unsorted_block_->bytes()) {
+ if (_reach_limit() && block->bytes() >
_state->unsorted_block()->allocated_bytes() -
+
_state->unsorted_block()->bytes()) {
RETURN_IF_ERROR(_do_sort());
}
{
SCOPED_TIMER(_merge_block_timer);
- const auto& data =
_state->unsorted_block_->get_columns_with_type_and_name();
+ const auto& data =
_state->unsorted_block()->get_columns_with_type_and_name();
const auto& arrival_data = block->get_columns_with_type_and_name();
auto sz = block->rows();
for (int i = 0; i < data.size(); ++i) {
DCHECK(data[i].type->equals(*(arrival_data[i].type)))
<< " type1: " << data[i].type->get_name()
<< " type2: " << arrival_data[i].type->get_name() << " i:
" << i;
- //TODO: to eliminate unnecessary expansion, we need a
`insert_range_from_const` for every column type.
- data[i].column->assume_mutable()->insert_range_from(
-
*arrival_data[i].column->convert_to_full_column_if_const(), 0, sz);
+ if (is_column_const(*arrival_data[i].column)) {
+ data[i].column->assume_mutable()->insert_many_from(
+ assert_cast<const
ColumnConst*>(arrival_data[i].column.get())
+ ->get_data_column(),
+ 0, sz);
+ } else {
+
data[i].column->assume_mutable()->insert_range_from(*arrival_data[i].column, 0,
sz);
+ }
}
block->clear_column_data();
}
@@ -231,7 +242,7 @@ Status FullSorter::append_block(Block* block) {
}
Status FullSorter::prepare_for_read() {
- if (_state->unsorted_block_->rows() > 0) {
+ if (_state->unsorted_block()->rows() > 0) {
RETURN_IF_ERROR(_do_sort());
}
return _state->build_merge_tree(_sort_description);
@@ -247,7 +258,7 @@ Status FullSorter::merge_sort_read_for_spill(RuntimeState*
state, doris::vectori
}
Status FullSorter::_do_sort() {
- Block* src_block = _state->unsorted_block_.get();
+ Block* src_block = _state->unsorted_block().get();
Block desc_block = src_block->clone_without_columns();
RETURN_IF_ERROR(partial_sort(*src_block, desc_block));
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 36c535c9101..69e70ab869c 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -43,6 +43,8 @@ class RowDescriptor;
namespace doris::vectorized {
+using MergeSorterQueue = SortingQueueBatch<MergeSortCursor>;
+
// TODO: now we only use merge sort
class MergeSorterState {
ENABLE_FACTORY_CREATOR(MergeSorterState);
@@ -53,9 +55,9 @@ public:
// create_empty_block should ignore invalid slots, unsorted_block
// should be same structure with arrival block from child node
// since block from child node may ignored these slots
- : unsorted_block_(Block::create_unique(
+ : _unsorted_block(Block::create_unique(
VectorizedUtils::create_empty_block(row_desc, true
/*ignore invalid slot*/))),
- offset_(offset) {}
+ _offset(offset) {}
~MergeSorterState() = default;
@@ -66,32 +68,33 @@ public:
Status merge_sort_read(doris::vectorized::Block* block, int batch_size,
bool* eos);
size_t data_size() const {
- size_t size = unsorted_block_->bytes();
- return size + in_mem_sorted_bocks_size_;
+ size_t size = _unsorted_block->bytes();
+ return size + _in_mem_sorted_bocks_size;
}
- uint64_t num_rows() const { return num_rows_; }
+ uint64_t num_rows() const { return _num_rows; }
- std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back();
}
+ std::shared_ptr<Block> last_sorted_block() { return _sorted_blocks.back();
}
- std::vector<std::shared_ptr<Block>>& get_sorted_block() { return
sorted_blocks_; }
- std::priority_queue<MergeSortCursor>& get_priority_queue() { return
priority_queue_; }
+ std::vector<std::shared_ptr<Block>>& get_sorted_block() { return
_sorted_blocks; }
+ MergeSorterQueue& get_queue() { return _queue; }
void reset();
- std::unique_ptr<Block> unsorted_block_;
+ std::unique_ptr<Block>& unsorted_block() { return _unsorted_block; }
private:
Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block*
block, bool* eos);
- std::priority_queue<MergeSortCursor> priority_queue_;
- std::vector<std::shared_ptr<Block>> sorted_blocks_;
- size_t in_mem_sorted_bocks_size_ = 0;
- uint64_t num_rows_ = 0;
+ std::unique_ptr<Block> _unsorted_block;
+ MergeSorterQueue _queue;
+ std::vector<std::shared_ptr<Block>> _sorted_blocks;
+ size_t _in_mem_sorted_bocks_size = 0;
+ uint64_t _num_rows = 0;
- int64_t offset_;
+ size_t _offset;
- Block merge_sorted_block_;
- std::unique_ptr<VSortedRunMerger> merger_;
+ Block _merge_sorted_block;
+ std::unique_ptr<VSortedRunMerger> _merger;
};
class Sorter {
@@ -177,7 +180,7 @@ public:
private:
bool _reach_limit() {
- return _state->unsorted_block_->allocated_bytes() >=
buffered_block_bytes_;
+ return _state->unsorted_block()->allocated_bytes() >=
buffered_block_bytes_;
}
Status _do_sort();
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index d31767f46e4..a37b6feb21e 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -20,6 +20,8 @@
#pragma once
+#include <utility>
+
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/sort_description.h"
@@ -48,8 +50,7 @@ private:
void _reset() {
sort_columns.clear();
auto columns = block.get_columns_and_convert();
- for (size_t j = 0, size = desc.size(); j < size; ++j) {
- auto& column_desc = desc[j];
+ for (auto& column_desc : desc) {
size_t column_number = !column_desc.column_name.empty()
?
block.get_position_by_name(column_desc.column_name)
: column_desc.column_number;
@@ -63,7 +64,7 @@ using HeapSortCursorBlockSPtr =
std::shared_ptr<HeapSortCursorBlockView>;
struct HeapSortCursorImpl {
public:
HeapSortCursorImpl(int row_id, HeapSortCursorBlockSPtr block_view)
- : _row_id(row_id), _block_view(block_view) {}
+ : _row_id(row_id), _block_view(std::move(block_view)) {}
HeapSortCursorImpl(const HeapSortCursorImpl& other) {
_row_id = other._row_id;
@@ -123,6 +124,7 @@ struct MergeSortCursorImpl {
ENABLE_FACTORY_CREATOR(MergeSortCursorImpl);
std::shared_ptr<Block> block;
ColumnRawPtrs sort_columns;
+ ColumnRawPtrs columns;
SortDescription desc;
size_t sort_columns_size = 0;
size_t pos = 0;
@@ -131,26 +133,33 @@ struct MergeSortCursorImpl {
MergeSortCursorImpl() = default;
virtual ~MergeSortCursorImpl() = default;
- MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription&
desc_)
- : block(block_), desc(desc_), sort_columns_size(desc.size()) {
+ MergeSortCursorImpl(std::shared_ptr<Block> block_, SortDescription desc_)
+ : block(std::move(block_)), desc(std::move(desc_)),
sort_columns_size(desc.size()) {
reset();
}
- MergeSortCursorImpl(const SortDescription& desc_)
- : block(Block::create_shared()), desc(desc_),
sort_columns_size(desc.size()) {}
+ MergeSortCursorImpl(SortDescription desc_)
+ : block(Block::create_shared()),
+ desc(std::move(desc_)),
+ sort_columns_size(desc.size()) {}
+
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
void reset() {
sort_columns.clear();
+ columns.clear();
- auto columns = block->get_columns_and_convert();
- for (size_t j = 0, size = desc.size(); j < size; ++j) {
- auto& column_desc = desc[j];
+ auto tmp_columns = block->get_columns_and_convert();
+ columns.reserve(tmp_columns.size());
+ for (auto col : tmp_columns) {
+ columns.push_back(col.get());
+ }
+ for (auto& column_desc : desc) {
size_t column_number = !column_desc.column_name.empty()
?
block->get_position_by_name(column_desc.column_name)
: column_desc.column_number;
- sort_columns.push_back(columns[column_number].get());
+ sort_columns.push_back(columns[column_number]);
}
pos = 0;
@@ -158,8 +167,9 @@ struct MergeSortCursorImpl {
}
bool is_first() const { return pos == 0; }
- bool is_last() const { return pos + 1 >= rows; }
- void next() { ++pos; }
+ bool is_last(size_t size = 1) const { return pos + size >= rows; }
+ void next(size_t size = 1) { pos += size; }
+ size_t get_size() const { return rows; }
virtual bool has_next_block() { return false; }
virtual Block* block_ptr() { return nullptr; }
@@ -169,11 +179,11 @@ using BlockSupplier = std::function<Status(Block*, bool*
eos)>;
struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
- BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
+ BlockSupplierSortCursorImpl(BlockSupplier block_supplier,
const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first)
- : _ordering_expr(ordering_expr), _block_supplier(block_supplier) {
+ : _ordering_expr(ordering_expr),
_block_supplier(std::move(block_supplier)) {
block = Block::create_shared();
sort_columns_size = ordering_expr.size();
@@ -185,8 +195,8 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
_is_eof = !has_next_block();
}
- BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const
SortDescription& desc_)
- : MergeSortCursorImpl(desc_), _block_supplier(block_supplier) {
+ BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const
SortDescription& desc_)
+ : MergeSortCursorImpl(desc_),
_block_supplier(std::move(block_supplier)) {
_is_eof = !has_next_block();
}
@@ -202,7 +212,7 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
// If status not ok, upper callers could not detect whether it is eof
or error.
// So that fatal here, and should throw exception in the future.
if (status.ok() && !block->empty()) {
- if (_ordering_expr.size() > 0) {
+ if (!_ordering_expr.empty()) {
for (int i = 0; status.ok() && i < desc.size(); ++i) {
// TODO yiguolei: throw exception if status not ok in the
future
status = _ordering_expr[i]->execute(block.get(),
&desc[i].column_number);
@@ -233,7 +243,7 @@ struct MergeSortCursor {
ENABLE_FACTORY_CREATOR(MergeSortCursor);
std::shared_ptr<MergeSortCursorImpl> impl;
- MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_)
{}
+ MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) :
impl(std::move(impl_)) {}
MergeSortCursorImpl* operator->() const { return impl.get(); }
/// The specified row of this cursor is greater than the specified row of
another cursor.
@@ -264,6 +274,21 @@ struct MergeSortCursor {
return greater_at(rhs, impl->rows - 1, 0) == -1;
}
+ /// Checks that all rows in the current block of this cursor are less than
or equal to all the rows of the current block of another cursor.
+ bool totally_less_or_equals(const MergeSortCursor& rhs) const {
+ if (impl->rows == 0 || rhs.impl->rows == 0) {
+ return false;
+ }
+
+ /// The last row of this cursor is no larger than the first row of the
another cursor.
+ return greater_at(rhs, impl->rows - 1, rhs->pos) <= 0;
+ }
+
+ bool greater_with_offset(const MergeSortCursor& rhs, size_t lhs_offset,
+ size_t rhs_offset) const {
+ return greater_at(rhs, impl->pos + lhs_offset, rhs.impl->pos +
rhs_offset) > 0;
+ }
+
bool greater(const MergeSortCursor& rhs) const {
return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0;
}
@@ -277,7 +302,7 @@ struct MergeSortBlockCursor {
ENABLE_FACTORY_CREATOR(MergeSortBlockCursor);
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
- MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) :
impl(impl_) {}
+ MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) :
impl(std::move(impl_)) {}
MergeSortCursorImpl* operator->() const { return impl.get(); }
/// The specified row of this cursor is greater than the specified row of
another cursor.
@@ -314,4 +339,219 @@ struct MergeSortBlockCursor {
}
};
+enum class SortingQueueStrategy : uint8_t { Default, Batch };
+
+/// Allows to fetch data from multiple sort cursors in sorted order (merging
sorted data streams).
+template <typename Cursor, SortingQueueStrategy strategy>
+class SortingQueueImpl {
+public:
+ SortingQueueImpl() = default;
+
+ template <typename Cursors>
+ explicit SortingQueueImpl(Cursors& cursors) {
+ size_t size = cursors.size();
+ _queue.reserve(size);
+
+ for (size_t i = 0; i < size; ++i) {
+ _queue.emplace_back(cursors[i]);
+ }
+
+ std::make_heap(_queue.begin(), _queue.end());
+
+ if constexpr (strategy == SortingQueueStrategy::Batch) {
+ if (!_queue.empty()) {
+ update_batch_size();
+ }
+ }
+ }
+
+ bool is_valid() const { return !_queue.empty(); }
+
+ Cursor& current()
+ requires(strategy == SortingQueueStrategy::Default)
+ {
+ return &_queue.front();
+ }
+
+ std::pair<Cursor*, size_t> current()
+ requires(strategy == SortingQueueStrategy::Batch)
+ {
+ return {&_queue.front(), batch_size};
+ }
+
+ size_t size() { return _queue.size(); }
+
+ Cursor& next_child() { return _queue[next_child_index()]; }
+
+ void ALWAYS_INLINE next()
+ requires(strategy == SortingQueueStrategy::Default)
+ {
+ assert(is_valid());
+
+ if (!_queue.front()->is_last()) {
+ _queue.front()->next();
+ update_top(true);
+ } else {
+ remove_top();
+ }
+ }
+
+ void ALWAYS_INLINE next(size_t batch_size_value)
+ requires(strategy == SortingQueueStrategy::Batch)
+ {
+ assert(is_valid());
+ assert(batch_size_value <= batch_size);
+ assert(batch_size_value > 0);
+
+ batch_size -= batch_size_value;
+ if (batch_size > 0) {
+ _queue.front()->next(batch_size_value);
+ return;
+ }
+
+ if (!_queue.front()->is_last(batch_size_value)) {
+ _queue.front()->next(batch_size_value);
+ update_top(false);
+ } else {
+ remove_top();
+ }
+ }
+
+ void remove_top() {
+ std::pop_heap(_queue.begin(), _queue.end());
+ _queue.pop_back();
+ next_child_idx = 0;
+
+ if constexpr (strategy == SortingQueueStrategy::Batch) {
+ if (_queue.empty()) {
+ batch_size = 0;
+ } else {
+ update_batch_size();
+ }
+ }
+ }
+
+ void push(MergeSortCursorImpl& cursor) {
+ _queue.emplace_back(&cursor);
+ std::push_heap(_queue.begin(), _queue.end());
+ next_child_idx = 0;
+
+ if constexpr (strategy == SortingQueueStrategy::Batch) {
+ update_batch_size();
+ }
+ }
+
+private:
+ using Container = std::vector<Cursor>;
+ Container _queue;
+
+ /// Cache comparison between first and second child if the order in queue
has not been changed.
+ size_t next_child_idx = 0;
+ size_t batch_size = 0;
+
+ size_t ALWAYS_INLINE next_child_index() {
+ if (next_child_idx == 0) {
+ next_child_idx = 1;
+
+ if (_queue.size() > 2 && _queue[1].greater(_queue[2])) {
+ ++next_child_idx;
+ }
+ }
+
+ return next_child_idx;
+ }
+
+ /// This is adapted version of the function __sift_down from libc++.
+ /// Why cannot simply use std::priority_queue?
+ /// - because it doesn't support updating the top element and requires pop
and push instead.
+ /// Also look at "Boost.Heap" library.
+ void ALWAYS_INLINE update_top(bool check_in_order) {
+ size_t size = _queue.size();
+ if (size < 2) {
+ return;
+ }
+
+ auto begin = _queue.begin();
+
+ size_t child_idx = next_child_index();
+ auto child_it = begin + child_idx;
+
+ /// Check if we are in order.
+ if (check_in_order && (*child_it).greater(*begin)) {
+ if constexpr (strategy == SortingQueueStrategy::Batch) {
+ update_batch_size();
+ }
+ return;
+ }
+
+ next_child_idx = 0;
+
+ auto curr_it = begin;
+ auto top(std::move(*begin));
+ do {
+ /// We are not in heap-order, swap the parent with it's largest
child.
+ *curr_it = std::move(*child_it);
+ curr_it = child_it;
+
+ // recompute the child based off of the updated parent
+ child_idx = 2 * child_idx + 1;
+
+ if (child_idx >= size) {
+ break;
+ }
+
+ child_it = begin + child_idx;
+
+ if ((child_idx + 1) < size && (*child_it).greater(*(child_it +
1))) {
+ /// Right child exists and is greater than left child.
+ ++child_it;
+ ++child_idx;
+ }
+
+ /// Check if we are in order.
+ } while (!((*child_it).greater(top)));
+ *curr_it = std::move(top);
+
+ if constexpr (strategy == SortingQueueStrategy::Batch) {
+ update_batch_size();
+ }
+ }
+
+ /// Update batch size of elements that client can extract from current
cursor
+ void update_batch_size() {
+ DCHECK(!_queue.empty());
+
+ auto& begin_cursor = *_queue.begin();
+ size_t min_cursor_size = begin_cursor->get_size();
+ size_t min_cursor_pos = begin_cursor->pos;
+
+ if (_queue.size() == 1) {
+ batch_size = min_cursor_size - min_cursor_pos;
+ return;
+ }
+
+ batch_size = 1;
+ size_t child_idx = next_child_index();
+ auto& next_child_cursor = *(_queue.begin() + child_idx);
+ if (min_cursor_pos + batch_size < min_cursor_size &&
+ next_child_cursor.greater_with_offset(begin_cursor, 0,
batch_size)) {
+ ++batch_size;
+ } else {
+ return;
+ }
+ if (begin_cursor.totally_less_or_equals(next_child_cursor)) {
+ batch_size = min_cursor_size - min_cursor_pos;
+ return;
+ }
+
+ while (min_cursor_pos + batch_size < min_cursor_size &&
+ next_child_cursor.greater_with_offset(begin_cursor, 0,
batch_size)) {
+ ++batch_size;
+ }
+ }
+};
+template <typename Cursor>
+using SortingQueue = SortingQueueImpl<Cursor, SortingQueueStrategy::Default>;
+template <typename Cursor>
+using SortingQueueBatch = SortingQueueImpl<Cursor,
SortingQueueStrategy::Batch>;
} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
index 20142e380ce..69a1b871d1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java
@@ -142,7 +142,10 @@ public class PartitionSortNode extends PlanNode {
Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for
tupleIds in PartitionSortNode");
TopNAlgorithm topNAlgorithm;
- if (function == WindowFuncType.ROW_NUMBER) {
+ if (hasGlobalLimit) {
+ // only need row number if has global limit, so we change
algorithm directly
+ topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
+ } else if (function == WindowFuncType.ROW_NUMBER) {
topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
} else if (function == WindowFuncType.RANK) {
topNAlgorithm = TopNAlgorithm.RANK;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]