This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1768169b9a0 Revert "[Improvement](sort) Free sort blocks if this block
is exhausted (#39306)" (#40211)
1768169b9a0 is described below
commit 1768169b9a034204491cbc3368d101977d55ec7a
Author: yiguolei <[email protected]>
AuthorDate: Sat Aug 31 15:58:55 2024 +0800
Revert "[Improvement](sort) Free sort blocks if this block is exhausted
(#39306)" (#40211)
Reverts apache/doris#39956
---
be/src/vec/common/sort/partition_sorter.cpp | 42 +++++++++--------
be/src/vec/common/sort/partition_sorter.h | 4 +-
be/src/vec/common/sort/sorter.cpp | 71 ++++++++++++++---------------
be/src/vec/common/sort/sorter.h | 12 +++--
be/src/vec/common/sort/topn_sorter.cpp | 17 +++----
be/src/vec/core/sort_cursor.h | 68 ++++++++++++++++-----------
be/src/vec/runtime/vsorted_run_merger.cpp | 34 ++++++++++----
be/src/vec/runtime/vsorted_run_merger.h | 13 ++++--
8 files changed, 149 insertions(+), 112 deletions(-)
diff --git a/be/src/vec/common/sort/partition_sorter.cpp
b/be/src/vec/common/sort/partition_sorter.cpp
index c363a41d1c7..1ea7c6de6a8 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -58,17 +58,20 @@ Status PartitionSorter::append_block(Block* input_block) {
Block sorted_block =
VectorizedUtils::create_empty_columnswithtypename(_row_desc);
DCHECK(input_block->columns() == sorted_block.columns());
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
- _state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
+ RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
return Status::OK();
}
Status PartitionSorter::prepare_for_read() {
+ auto& cursors = _state->get_cursors();
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
for (auto& block : blocks) {
- priority_queue.push(MergeSortCursorImpl::create_shared(block,
_sort_description));
+ cursors.emplace_back(block, _sort_description);
+ }
+ for (auto& cursor : cursors) {
+ priority_queue.push(MergeSortCursor(&cursor));
}
- blocks.clear();
return Status::OK();
}
@@ -81,30 +84,29 @@ void PartitionSorter::reset_sorter_state(RuntimeState*
runtime_state) {
}
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos)
{
- if (_state->get_priority_queue().empty()) {
- *eos = true;
- } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
- block->swap(*_state->get_priority_queue().top().impl->block);
- block->set_num_rows(_partition_inner_limit);
+ if (_state->get_sorted_block().empty()) {
*eos = true;
} else {
- RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
+ if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
+ auto& sorted_block = _state->get_sorted_block()[0];
+ block->swap(sorted_block);
+ block->set_num_rows(_partition_inner_limit);
+ *eos = true;
+ } else {
+ RETURN_IF_ERROR(partition_sort_read(block, eos,
state->batch_size()));
+ }
}
return Status::OK();
}
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos,
int batch_size) {
- auto& priority_queue = _state->get_priority_queue();
- if (priority_queue.empty()) {
- *eos = true;
- return Status::OK();
- }
- const auto& sorted_block = priority_queue.top().impl->block;
- size_t num_columns = sorted_block->columns();
+ const auto& sorted_block = _state->get_sorted_block()[0];
+ size_t num_columns = sorted_block.columns();
MutableBlock m_block =
- VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*sorted_block);
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block,
sorted_block);
MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;
+ auto& priority_queue = _state->get_priority_queue();
bool get_enough_data = false;
while (!priority_queue.empty()) {
@@ -119,7 +121,7 @@ Status PartitionSorter::partition_sort_read(Block*
output_block, bool* eos, int
//1 row_number no need to check distinct, just output
partition_inner_limit row
if ((current_output_rows + _output_total_rows) <
_partition_inner_limit) {
for (size_t i = 0; i < num_columns; ++i) {
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+ merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
}
} else {
//rows has get enough
@@ -153,7 +155,7 @@ Status PartitionSorter::partition_sort_read(Block*
output_block, bool* eos, int
}
}
for (size_t i = 0; i < num_columns; ++i) {
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+ merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
}
break;
}
@@ -178,7 +180,7 @@ Status PartitionSorter::partition_sort_read(Block*
output_block, bool* eos, int
*_previous_row = current;
}
for (size_t i = 0; i < num_columns; ++i) {
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+ merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
}
current_output_rows++;
break;
diff --git a/be/src/vec/common/sort/partition_sorter.h
b/be/src/vec/common/sort/partition_sorter.h
index 01e009d200d..77dcb683711 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -50,7 +50,7 @@ public:
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos),
impl(cursor.impl) {}
void reset() {
- impl->reset();
+ impl = nullptr;
row = 0;
}
bool compare_two_rows(const MergeSortCursor& rhs) const {
@@ -67,7 +67,7 @@ public:
return true;
}
int row = 0;
- std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
+ MergeSortCursorImpl* impl = nullptr;
};
class PartitionSorter final : public Sorter {
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 89f1c7d73f1..eca7e15626b 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -59,44 +59,48 @@ namespace doris::vectorized {
void MergeSorterState::reset() {
auto empty_queue = std::priority_queue<MergeSortCursor>();
priority_queue_.swap(empty_queue);
- std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
- std::vector<std::shared_ptr<Block>> empty_blocks(0);
+ std::vector<MergeSortCursorImpl> empty_cursors(0);
+ cursors_.swap(empty_cursors);
+ std::vector<Block> empty_blocks(0);
sorted_blocks_.swap(empty_blocks);
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
in_mem_sorted_bocks_size_ = 0;
}
-void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
- auto rows = block->rows();
+Status MergeSorterState::add_sorted_block(Block& block) {
+ auto rows = block.rows();
if (0 == rows) {
- return;
+ return Status::OK();
}
- in_mem_sorted_bocks_size_ += block->bytes();
- sorted_blocks_.emplace_back(block);
+ in_mem_sorted_bocks_size_ += block.bytes();
+ sorted_blocks_.emplace_back(std::move(block));
num_rows_ += rows;
+ return Status::OK();
}
Status MergeSorterState::build_merge_tree(const SortDescription&
sort_description) {
for (auto& block : sorted_blocks_) {
- priority_queue_.emplace(
- MergeSortCursorImpl::create_shared(std::move(block),
sort_description));
+ cursors_.emplace_back(block, sort_description);
+ }
+
+ if (sorted_blocks_.size() > 1) {
+ for (auto& cursor : cursors_) {
+ priority_queue_.emplace(&cursor);
+ }
}
- sorted_blocks_.clear();
return Status::OK();
}
Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int
batch_size,
bool* eos) {
- DCHECK(sorted_blocks_.empty());
- DCHECK(unsorted_block_->empty());
- if (priority_queue_.empty()) {
+ if (sorted_blocks_.empty()) {
*eos = true;
- } else if (priority_queue_.size() == 1) {
+ } else if (sorted_blocks_.size() == 1) {
if (offset_ != 0) {
- priority_queue_.top().impl->block->skip_num_rows(offset_);
+ sorted_blocks_[0].skip_num_rows(offset_);
}
- block->swap(*priority_queue_.top().impl->block);
+ block->swap(sorted_blocks_[0]);
*eos = true;
} else {
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
@@ -106,14 +110,9 @@ Status
MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
Status MergeSorterState::_merge_sort_read_impl(int batch_size,
doris::vectorized::Block* block,
bool* eos) {
- if (priority_queue_.empty()) {
- *eos = true;
- return Status::OK();
- }
- size_t num_columns = priority_queue_.top().impl->block->columns();
+ size_t num_columns = sorted_blocks_[0].columns();
- MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
- block, *priority_queue_.top().impl->block);
+ MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
MutableColumns& merged_columns = m_block.mutable_columns();
/// Take rows from queue in right order and push to 'merged'.
@@ -124,7 +123,7 @@ Status MergeSorterState::_merge_sort_read_impl(int
batch_size, doris::vectorized
if (offset_ == 0) {
for (size_t i = 0; i < num_columns; ++i)
-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+ merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
++merged_rows;
} else {
offset_--;
@@ -135,9 +134,7 @@ Status MergeSorterState::_merge_sort_read_impl(int
batch_size, doris::vectorized
priority_queue_.push(current);
}
- if (merged_rows == batch_size) {
- break;
- }
+ if (merged_rows == batch_size) break;
}
block->set_columns(std::move(merged_columns));
@@ -264,22 +261,22 @@ Status FullSorter::_do_sort() {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
-
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
- _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
- _state->last_sorted_block(), _sort_description));
+ static_cast<void>(_state->add_sorted_block(desc_block));
+ _block_priority_queue.emplace(_pool->add(
+ new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
} else {
- auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
- Block::create_shared(std::move(desc_block)),
_sort_description);
- MergeSortBlockCursor block_cursor(tmp_cursor_impl);
+ auto tmp_cursor_impl =
+ std::make_unique<MergeSortCursorImpl>(desc_block,
_sort_description);
+ MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
- _state->add_sorted_block(tmp_cursor_impl->block);
-
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
- _state->last_sorted_block(), _sort_description));
+ static_cast<void>(_state->add_sorted_block(desc_block));
+ _block_priority_queue.emplace(_pool->add(
+ new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
}
}
} else {
// dispose normal sort logic
- _state->add_sorted_block(Block::create_shared(std::move(desc_block)));
+ static_cast<void>(_state->add_sorted_block(desc_block));
}
return Status::OK();
}
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index daa871f5d48..2525ca8c0c1 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -59,7 +59,7 @@ public:
~MergeSorterState() = default;
- void add_sorted_block(std::shared_ptr<Block> block);
+ Status add_sorted_block(Block& block);
Status build_merge_tree(const SortDescription& sort_description);
@@ -72,19 +72,23 @@ public:
uint64_t num_rows() const { return num_rows_; }
- std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back();
}
+ Block& last_sorted_block() { return sorted_blocks_.back(); }
- std::vector<std::shared_ptr<Block>>& get_sorted_block() { return
sorted_blocks_; }
+ std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
std::priority_queue<MergeSortCursor>& get_priority_queue() { return
priority_queue_; }
+ std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
void reset();
std::unique_ptr<Block> unsorted_block_;
private:
+ int _calc_spill_blocks_to_merge() const;
+
Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block*
block, bool* eos);
std::priority_queue<MergeSortCursor> priority_queue_;
- std::vector<std::shared_ptr<Block>> sorted_blocks_;
+ std::vector<MergeSortCursorImpl> cursors_;
+ std::vector<Block> sorted_blocks_;
size_t in_mem_sorted_bocks_size_ = 0;
uint64_t num_rows_ = 0;
diff --git a/be/src/vec/common/sort/topn_sorter.cpp
b/be/src/vec/common/sort/topn_sorter.cpp
index 1f24fb14c95..58c3cd2dd0c 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -72,16 +72,17 @@ Status TopNSorter::_do_sort(Block* block) {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
-
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
- _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
- _state->last_sorted_block(), _sort_description));
+ RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
+ _block_priority_queue.emplace(_pool->add(
+ new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
} else {
- auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
- Block::create_shared(std::move(sorted_block)),
_sort_description);
- MergeSortBlockCursor block_cursor(tmp_cursor_impl);
+ auto tmp_cursor_impl =
+ std::make_unique<MergeSortCursorImpl>(sorted_block,
_sort_description);
+ MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
- _state->add_sorted_block(block_cursor.impl->block);
- _block_priority_queue.emplace(tmp_cursor_impl);
+ RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
+ _block_priority_queue.emplace(_pool->add(
+ new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
}
}
} else {
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 8b627f50af7..e565819c9d6 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -120,8 +120,7 @@ private:
* It is used in priority queue.
*/
struct MergeSortCursorImpl {
- ENABLE_FACTORY_CREATOR(MergeSortCursorImpl);
- std::shared_ptr<Block> block;
+ ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SortDescription desc;
size_t sort_columns_size = 0;
@@ -131,30 +130,37 @@ struct MergeSortCursorImpl {
MergeSortCursorImpl() = default;
virtual ~MergeSortCursorImpl() = default;
- MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription&
desc_)
- : block(block_), desc(desc_), sort_columns_size(desc.size()) {
- reset();
+ MergeSortCursorImpl(Block& block, const SortDescription& desc_)
+ : desc(desc_), sort_columns_size(desc.size()) {
+ reset(block);
}
MergeSortCursorImpl(const SortDescription& desc_)
- : block(Block::create_shared()), desc(desc_),
sort_columns_size(desc.size()) {}
+ : desc(desc_), sort_columns_size(desc.size()) {}
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
- void reset() {
+ void reset(Block& block) {
+ all_columns.clear();
sort_columns.clear();
- auto columns = block->get_columns_and_convert();
+ auto columns = block.get_columns_and_convert();
+ size_t num_columns = columns.size();
+
+ for (size_t j = 0; j < num_columns; ++j) {
+ all_columns.push_back(columns[j].get());
+ }
+
for (size_t j = 0, size = desc.size(); j < size; ++j) {
auto& column_desc = desc[j];
size_t column_number = !column_desc.column_name.empty()
- ?
block->get_position_by_name(column_desc.column_name)
+ ?
block.get_position_by_name(column_desc.column_name)
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());
}
pos = 0;
- rows = block->rows();
+ rows = all_columns[0]->size();
}
bool is_first() const { return pos == 0; }
@@ -168,13 +174,11 @@ struct MergeSortCursorImpl {
using BlockSupplier = std::function<Status(Block*, bool* eos)>;
struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
- ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first)
: _ordering_expr(ordering_expr), _block_supplier(block_supplier) {
- block = Block::create_shared();
sort_columns_size = ordering_expr.size();
desc.resize(ordering_expr.size());
@@ -191,21 +195,21 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
}
bool has_next_block() override {
- block->clear();
+ _block.clear();
Status status;
do {
- status = _block_supplier(block.get(), &_is_eof);
- } while (block->empty() && !_is_eof && status.ok());
+ status = _block_supplier(&_block, &_is_eof);
+ } while (_block.empty() && !_is_eof && status.ok());
// If status not ok, upper callers could not detect whether it is eof
or error.
// So that fatal here, and should throw exception in the future.
- if (status.ok() && !block->empty()) {
+ if (status.ok() && !_block.empty()) {
if (_ordering_expr.size() > 0) {
for (int i = 0; status.ok() && i < desc.size(); ++i) {
// TODO yiguolei: throw exception if status not ok in the
future
- status = _ordering_expr[i]->execute(block.get(),
&desc[i].column_number);
+ status = _ordering_expr[i]->execute(&_block,
&desc[i].column_number);
}
}
- MergeSortCursorImpl::reset();
+ MergeSortCursorImpl::reset(_block);
return status.ok();
} else if (!status.ok()) {
throw std::runtime_error(status.msg());
@@ -217,21 +221,32 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
if (_is_eof) {
return nullptr;
}
- return block.get();
+ return &_block;
+ }
+
+ size_t columns_num() const { return all_columns.size(); }
+
+ Block create_empty_blocks() const {
+ size_t num_columns = columns_num();
+ MutableColumns columns(num_columns);
+ for (size_t i = 0; i < num_columns; ++i) {
+ columns[i] = all_columns[i]->clone_empty();
+ }
+ return _block.clone_with_columns(std::move(columns));
}
VExprContextSPtrs _ordering_expr;
+ Block _block;
BlockSupplier _block_supplier {};
bool _is_eof = false;
};
/// For easy copying.
struct MergeSortCursor {
- ENABLE_FACTORY_CREATOR(MergeSortCursor);
- std::shared_ptr<MergeSortCursorImpl> impl;
+ MergeSortCursorImpl* impl;
- MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_)
{}
- MergeSortCursorImpl* operator->() const { return impl.get(); }
+ MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+ MergeSortCursorImpl* operator->() const { return impl; }
/// The specified row of this cursor is greater than the specified row of
another cursor.
int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t
rhs_pos) const {
@@ -271,11 +286,10 @@ struct MergeSortCursor {
/// For easy copying.
struct MergeSortBlockCursor {
- ENABLE_FACTORY_CREATOR(MergeSortBlockCursor);
- std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
+ MergeSortCursorImpl* impl = nullptr;
- MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) :
impl(impl_) {}
- MergeSortCursorImpl* operator->() const { return impl.get(); }
+ MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+ MergeSortCursorImpl* operator->() const { return impl; }
/// The specified row of this cursor is greater than the specified row of
another cursor.
int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const {
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp
b/be/src/vec/runtime/vsorted_run_merger.cpp
index f321622012f..3b17f957deb 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -28,6 +28,14 @@
#include "vec/core/column_with_type_and_name.h"
#include "vec/utils/util.hpp"
+namespace doris {
+namespace vectorized {
+class VExprContext;
+} // namespace vectorized
+} // namespace doris
+
+using std::vector;
+
namespace doris::vectorized {
VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr,
@@ -60,14 +68,13 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile)
{
_get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
}
-Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs)
{
+Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
try {
for (const auto& supplier : input_runs) {
if (_use_sort_desc) {
-
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier,
_desc));
+ _cursors.emplace_back(supplier, _desc);
} else {
-
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(
- supplier, _ordering_expr, _is_asc_order,
_nulls_first));
+ _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order,
_nulls_first);
}
}
} catch (const std::exception& e) {
@@ -75,8 +82,15 @@ Status VSortedRunMerger::prepare(const
std::vector<BlockSupplier>& input_runs) {
}
for (auto& _cursor : _cursors) {
- if (!_cursor->_is_eof) {
- _priority_queue.push(MergeSortCursor(_cursor));
+ if (!_cursor._is_eof) {
+ _priority_queue.push(MergeSortCursor(&_cursor));
+ }
+ }
+
+ for (const auto& cursor : _cursors) {
+ if (!cursor._is_eof) {
+ _empty_block = cursor.create_empty_blocks();
+ break;
}
}
@@ -131,7 +145,7 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
}
} else {
if (current->block_ptr() != nullptr) {
- for (int i = 0; i < current->block->columns(); i++) {
+ for (int i = 0; i < current->all_columns.size(); i++) {
auto& column_with_type =
current->block_ptr()->get_by_position(i);
column_with_type.column = column_with_type.column->cut(
current->pos, current->rows - current->pos);
@@ -148,9 +162,9 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
}
}
} else {
- size_t num_columns = _priority_queue.top().impl->block->columns();
- MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
- output_block, *_priority_queue.top().impl->block);
+ size_t num_columns = _empty_block.columns();
+ MutableBlock m_block =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block,
_empty_block);
MutableColumns& merged_columns = m_block.mutable_columns();
if (num_columns != merged_columns.size()) {
diff --git a/be/src/vec/runtime/vsorted_run_merger.h
b/be/src/vec/runtime/vsorted_run_merger.h
index 844704fd130..943956d8c38 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -30,7 +30,9 @@
#include "vec/core/sort_description.h"
#include "vec/exprs/vexpr_fwd.h"
-namespace doris::vectorized {
+namespace doris {
+
+namespace vectorized {
// VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is
a sorted
// sequence of blocks, which are fetched from a BlockSupplier function object.
@@ -76,12 +78,14 @@ protected:
bool _pipeline_engine_enabled = false;
- std::vector<std::shared_ptr<BlockSupplierSortCursorImpl>> _cursors;
+ std::vector<BlockSupplierSortCursorImpl> _cursors;
std::priority_queue<MergeSortCursor> _priority_queue;
/// In pipeline engine, if a cursor needs to read one more block from
supplier,
/// we make it as a pending cursor until the supplier is readable.
- std::shared_ptr<MergeSortCursorImpl> _pending_cursor = nullptr;
+ MergeSortCursorImpl* _pending_cursor = nullptr;
+
+ Block _empty_block;
// Times calls to get_next().
RuntimeProfile::Counter* _get_next_timer = nullptr;
@@ -101,4 +105,5 @@ private:
bool has_next_block(MergeSortCursor& current);
};
-} // namespace doris::vectorized
+} // namespace vectorized
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]