This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 3ac8347e3d4 [Improvement](sort) Free sort blocks if this block is
exhausted (#39306) (#39956)
3ac8347e3d4 is described below
commit 3ac8347e3d43673069cd56f005409a8f253d663c
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 30 13:35:43 2024 +0800
[Improvement](sort) Free sort blocks if this block is exhausted (#39306)
(#39956)
## Proposed changes
pick #39306
<!--Describe your changes.-->
---
be/src/vec/common/sort/partition_sorter.cpp | 42 ++++++++---------
be/src/vec/common/sort/partition_sorter.h | 4 +-
be/src/vec/common/sort/sorter.cpp | 71 +++++++++++++++--------------
be/src/vec/common/sort/sorter.h | 12 ++---
be/src/vec/common/sort/topn_sorter.cpp | 17 ++++---
be/src/vec/core/sort_cursor.h | 68 +++++++++++----------------
be/src/vec/runtime/vsorted_run_merger.cpp | 34 ++++----------
be/src/vec/runtime/vsorted_run_merger.h | 13 ++----
8 files changed, 112 insertions(+), 149 deletions(-)
diff --git a/be/src/vec/common/sort/partition_sorter.cpp
b/be/src/vec/common/sort/partition_sorter.cpp
index 1ea7c6de6a8..c363a41d1c7 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -58,20 +58,17 @@ Status PartitionSorter::append_block(Block* input_block) {
Block sorted_block =
VectorizedUtils::create_empty_columnswithtypename(_row_desc);
DCHECK(input_block->columns() == sorted_block.columns());
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
- RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
+ _state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
return Status::OK();
}
Status PartitionSorter::prepare_for_read() {
- auto& cursors = _state->get_cursors();
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
for (auto& block : blocks) {
- cursors.emplace_back(block, _sort_description);
- }
- for (auto& cursor : cursors) {
- priority_queue.push(MergeSortCursor(&cursor));
+ priority_queue.push(MergeSortCursorImpl::create_shared(block,
_sort_description));
}
+ blocks.clear();
return Status::OK();
}
@@ -84,29 +81,30 @@ void PartitionSorter::reset_sorter_state(RuntimeState*
runtime_state) {
}
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos)
{
- if (_state->get_sorted_block().empty()) {
+ if (_state->get_priority_queue().empty()) {
+ *eos = true;
+ } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
+ block->swap(*_state->get_priority_queue().top().impl->block);
+ block->set_num_rows(_partition_inner_limit);
*eos = true;
} else {
- if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
- auto& sorted_block = _state->get_sorted_block()[0];
- block->swap(sorted_block);
- block->set_num_rows(_partition_inner_limit);
- *eos = true;
- } else {
- RETURN_IF_ERROR(partition_sort_read(block, eos,
state->batch_size()));
- }
+ RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
}
return Status::OK();
}
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos,
int batch_size) {
- const auto& sorted_block = _state->get_sorted_block()[0];
- size_t num_columns = sorted_block.columns();
+ auto& priority_queue = _state->get_priority_queue();
+ if (priority_queue.empty()) {
+ *eos = true;
+ return Status::OK();
+ }
+ const auto& sorted_block = priority_queue.top().impl->block;
+ size_t num_columns = sorted_block->columns();
MutableBlock m_block =
- VectorizedUtils::build_mutable_mem_reuse_block(output_block,
sorted_block);
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*sorted_block);
MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;
- auto& priority_queue = _state->get_priority_queue();
bool get_enough_data = false;
while (!priority_queue.empty()) {
@@ -121,7 +119,7 @@ Status PartitionSorter::partition_sort_read(Block*
output_block, bool* eos, int
//1 row_number no need to check distinct, just output
partition_inner_limit row
if ((current_output_rows + _output_total_rows) <
_partition_inner_limit) {
for (size_t i = 0; i < num_columns; ++i) {
- merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
} else {
//rows has get enough
@@ -155,7 +153,7 @@ Status PartitionSorter::partition_sort_read(Block*
output_block, bool* eos, int
}
}
for (size_t i = 0; i < num_columns; ++i) {
- merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
break;
}
@@ -180,7 +178,7 @@ Status PartitionSorter::partition_sort_read(Block*
output_block, bool* eos, int
*_previous_row = current;
}
for (size_t i = 0; i < num_columns; ++i) {
- merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
current_output_rows++;
break;
diff --git a/be/src/vec/common/sort/partition_sorter.h
b/be/src/vec/common/sort/partition_sorter.h
index 77dcb683711..01e009d200d 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -50,7 +50,7 @@ public:
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos),
impl(cursor.impl) {}
void reset() {
- impl = nullptr;
+ impl->reset();
row = 0;
}
bool compare_two_rows(const MergeSortCursor& rhs) const {
@@ -67,7 +67,7 @@ public:
return true;
}
int row = 0;
- MergeSortCursorImpl* impl = nullptr;
+ std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
};
class PartitionSorter final : public Sorter {
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index eca7e15626b..89f1c7d73f1 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -59,48 +59,44 @@ namespace doris::vectorized {
void MergeSorterState::reset() {
auto empty_queue = std::priority_queue<MergeSortCursor>();
priority_queue_.swap(empty_queue);
- std::vector<MergeSortCursorImpl> empty_cursors(0);
- cursors_.swap(empty_cursors);
- std::vector<Block> empty_blocks(0);
+ std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
+ std::vector<std::shared_ptr<Block>> empty_blocks(0);
sorted_blocks_.swap(empty_blocks);
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
in_mem_sorted_bocks_size_ = 0;
}
-Status MergeSorterState::add_sorted_block(Block& block) {
- auto rows = block.rows();
+void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
+ auto rows = block->rows();
if (0 == rows) {
- return Status::OK();
+ return;
}
- in_mem_sorted_bocks_size_ += block.bytes();
- sorted_blocks_.emplace_back(std::move(block));
+ in_mem_sorted_bocks_size_ += block->bytes();
+ sorted_blocks_.emplace_back(block);
num_rows_ += rows;
- return Status::OK();
}
Status MergeSorterState::build_merge_tree(const SortDescription&
sort_description) {
for (auto& block : sorted_blocks_) {
- cursors_.emplace_back(block, sort_description);
- }
-
- if (sorted_blocks_.size() > 1) {
- for (auto& cursor : cursors_) {
- priority_queue_.emplace(&cursor);
- }
+ priority_queue_.emplace(
+ MergeSortCursorImpl::create_shared(std::move(block),
sort_description));
}
+ sorted_blocks_.clear();
return Status::OK();
}
Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int
batch_size,
bool* eos) {
- if (sorted_blocks_.empty()) {
+ DCHECK(sorted_blocks_.empty());
+ DCHECK(unsorted_block_->empty());
+ if (priority_queue_.empty()) {
*eos = true;
- } else if (sorted_blocks_.size() == 1) {
+ } else if (priority_queue_.size() == 1) {
if (offset_ != 0) {
- sorted_blocks_[0].skip_num_rows(offset_);
+ priority_queue_.top().impl->block->skip_num_rows(offset_);
}
- block->swap(sorted_blocks_[0]);
+ block->swap(*priority_queue_.top().impl->block);
*eos = true;
} else {
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
@@ -110,9 +106,14 @@ Status
MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
Status MergeSorterState::_merge_sort_read_impl(int batch_size,
doris::vectorized::Block* block,
bool* eos) {
- size_t num_columns = sorted_blocks_[0].columns();
+ if (priority_queue_.empty()) {
+ *eos = true;
+ return Status::OK();
+ }
+ size_t num_columns = priority_queue_.top().impl->block->columns();
- MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
+ MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
+ block, *priority_queue_.top().impl->block);
MutableColumns& merged_columns = m_block.mutable_columns();
/// Take rows from queue in right order and push to 'merged'.
@@ -123,7 +124,7 @@ Status MergeSorterState::_merge_sort_read_impl(int
batch_size, doris::vectorized
if (offset_ == 0) {
for (size_t i = 0; i < num_columns; ++i)
- merged_columns[i]->insert_from(*current->all_columns[i],
current->pos);
+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
++merged_rows;
} else {
offset_--;
@@ -134,7 +135,9 @@ Status MergeSorterState::_merge_sort_read_impl(int
batch_size, doris::vectorized
priority_queue_.push(current);
}
- if (merged_rows == batch_size) break;
+ if (merged_rows == batch_size) {
+ break;
+ }
}
block->set_columns(std::move(merged_columns));
@@ -261,22 +264,22 @@ Status FullSorter::_do_sort() {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
- static_cast<void>(_state->add_sorted_block(desc_block));
- _block_priority_queue.emplace(_pool->add(
- new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
+
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
+ _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
+ _state->last_sorted_block(), _sort_description));
} else {
- auto tmp_cursor_impl =
- std::make_unique<MergeSortCursorImpl>(desc_block,
_sort_description);
- MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
+ auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
+ Block::create_shared(std::move(desc_block)),
_sort_description);
+ MergeSortBlockCursor block_cursor(tmp_cursor_impl);
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
- static_cast<void>(_state->add_sorted_block(desc_block));
- _block_priority_queue.emplace(_pool->add(
- new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
+ _state->add_sorted_block(tmp_cursor_impl->block);
+
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
+ _state->last_sorted_block(), _sort_description));
}
}
} else {
// dispose normal sort logic
- static_cast<void>(_state->add_sorted_block(desc_block));
+ _state->add_sorted_block(Block::create_shared(std::move(desc_block)));
}
return Status::OK();
}
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 2525ca8c0c1..daa871f5d48 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -59,7 +59,7 @@ public:
~MergeSorterState() = default;
- Status add_sorted_block(Block& block);
+ void add_sorted_block(std::shared_ptr<Block> block);
Status build_merge_tree(const SortDescription& sort_description);
@@ -72,23 +72,19 @@ public:
uint64_t num_rows() const { return num_rows_; }
- Block& last_sorted_block() { return sorted_blocks_.back(); }
+ std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back();
}
- std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
+ std::vector<std::shared_ptr<Block>>& get_sorted_block() { return
sorted_blocks_; }
std::priority_queue<MergeSortCursor>& get_priority_queue() { return
priority_queue_; }
- std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
void reset();
std::unique_ptr<Block> unsorted_block_;
private:
- int _calc_spill_blocks_to_merge() const;
-
Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block*
block, bool* eos);
std::priority_queue<MergeSortCursor> priority_queue_;
- std::vector<MergeSortCursorImpl> cursors_;
- std::vector<Block> sorted_blocks_;
+ std::vector<std::shared_ptr<Block>> sorted_blocks_;
size_t in_mem_sorted_bocks_size_ = 0;
uint64_t num_rows_ = 0;
diff --git a/be/src/vec/common/sort/topn_sorter.cpp
b/be/src/vec/common/sort/topn_sorter.cpp
index 58c3cd2dd0c..1f24fb14c95 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -72,17 +72,16 @@ Status TopNSorter::_do_sort(Block* block) {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
- RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
- _block_priority_queue.emplace(_pool->add(
- new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
+
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
+ _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
+ _state->last_sorted_block(), _sort_description));
} else {
- auto tmp_cursor_impl =
- std::make_unique<MergeSortCursorImpl>(sorted_block,
_sort_description);
- MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
+ auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
+ Block::create_shared(std::move(sorted_block)),
_sort_description);
+ MergeSortBlockCursor block_cursor(tmp_cursor_impl);
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
- RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
- _block_priority_queue.emplace(_pool->add(
- new MergeSortCursorImpl(_state->last_sorted_block(),
_sort_description)));
+ _state->add_sorted_block(block_cursor.impl->block);
+ _block_priority_queue.emplace(tmp_cursor_impl);
}
}
} else {
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index e565819c9d6..8b627f50af7 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -120,7 +120,8 @@ private:
* It is used in priority queue.
*/
struct MergeSortCursorImpl {
- ColumnRawPtrs all_columns;
+ ENABLE_FACTORY_CREATOR(MergeSortCursorImpl);
+ std::shared_ptr<Block> block;
ColumnRawPtrs sort_columns;
SortDescription desc;
size_t sort_columns_size = 0;
@@ -130,37 +131,30 @@ struct MergeSortCursorImpl {
MergeSortCursorImpl() = default;
virtual ~MergeSortCursorImpl() = default;
- MergeSortCursorImpl(Block& block, const SortDescription& desc_)
- : desc(desc_), sort_columns_size(desc.size()) {
- reset(block);
+ MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription&
desc_)
+ : block(block_), desc(desc_), sort_columns_size(desc.size()) {
+ reset();
}
MergeSortCursorImpl(const SortDescription& desc_)
- : desc(desc_), sort_columns_size(desc.size()) {}
+ : block(Block::create_shared()), desc(desc_),
sort_columns_size(desc.size()) {}
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
- void reset(Block& block) {
- all_columns.clear();
+ void reset() {
sort_columns.clear();
- auto columns = block.get_columns_and_convert();
- size_t num_columns = columns.size();
-
- for (size_t j = 0; j < num_columns; ++j) {
- all_columns.push_back(columns[j].get());
- }
-
+ auto columns = block->get_columns_and_convert();
for (size_t j = 0, size = desc.size(); j < size; ++j) {
auto& column_desc = desc[j];
size_t column_number = !column_desc.column_name.empty()
- ?
block.get_position_by_name(column_desc.column_name)
+ ?
block->get_position_by_name(column_desc.column_name)
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());
}
pos = 0;
- rows = all_columns[0]->size();
+ rows = block->rows();
}
bool is_first() const { return pos == 0; }
@@ -174,11 +168,13 @@ struct MergeSortCursorImpl {
using BlockSupplier = std::function<Status(Block*, bool* eos)>;
struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
+ ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first)
: _ordering_expr(ordering_expr), _block_supplier(block_supplier) {
+ block = Block::create_shared();
sort_columns_size = ordering_expr.size();
desc.resize(ordering_expr.size());
@@ -195,21 +191,21 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
}
bool has_next_block() override {
- _block.clear();
+ block->clear();
Status status;
do {
- status = _block_supplier(&_block, &_is_eof);
- } while (_block.empty() && !_is_eof && status.ok());
+ status = _block_supplier(block.get(), &_is_eof);
+ } while (block->empty() && !_is_eof && status.ok());
// If status not ok, upper callers could not detect whether it is eof
or error.
// So that fatal here, and should throw exception in the future.
- if (status.ok() && !_block.empty()) {
+ if (status.ok() && !block->empty()) {
if (_ordering_expr.size() > 0) {
for (int i = 0; status.ok() && i < desc.size(); ++i) {
// TODO yiguolei: throw exception if status not ok in the
future
- status = _ordering_expr[i]->execute(&_block,
&desc[i].column_number);
+ status = _ordering_expr[i]->execute(block.get(),
&desc[i].column_number);
}
}
- MergeSortCursorImpl::reset(_block);
+ MergeSortCursorImpl::reset();
return status.ok();
} else if (!status.ok()) {
throw std::runtime_error(status.msg());
@@ -221,32 +217,21 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
if (_is_eof) {
return nullptr;
}
- return &_block;
- }
-
- size_t columns_num() const { return all_columns.size(); }
-
- Block create_empty_blocks() const {
- size_t num_columns = columns_num();
- MutableColumns columns(num_columns);
- for (size_t i = 0; i < num_columns; ++i) {
- columns[i] = all_columns[i]->clone_empty();
- }
- return _block.clone_with_columns(std::move(columns));
+ return block.get();
}
VExprContextSPtrs _ordering_expr;
- Block _block;
BlockSupplier _block_supplier {};
bool _is_eof = false;
};
/// For easy copying.
struct MergeSortCursor {
- MergeSortCursorImpl* impl;
+ ENABLE_FACTORY_CREATOR(MergeSortCursor);
+ std::shared_ptr<MergeSortCursorImpl> impl;
- MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
- MergeSortCursorImpl* operator->() const { return impl; }
+ MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_)
{}
+ MergeSortCursorImpl* operator->() const { return impl.get(); }
/// The specified row of this cursor is greater than the specified row of
another cursor.
int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t
rhs_pos) const {
@@ -286,10 +271,11 @@ struct MergeSortCursor {
/// For easy copying.
struct MergeSortBlockCursor {
- MergeSortCursorImpl* impl = nullptr;
+ ENABLE_FACTORY_CREATOR(MergeSortBlockCursor);
+ std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
- MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
- MergeSortCursorImpl* operator->() const { return impl; }
+ MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) :
impl(impl_) {}
+ MergeSortCursorImpl* operator->() const { return impl.get(); }
/// The specified row of this cursor is greater than the specified row of
another cursor.
int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const {
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 3b17f957deb..f321622012f 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -28,14 +28,6 @@
#include "vec/core/column_with_type_and_name.h"
#include "vec/utils/util.hpp"
-namespace doris {
-namespace vectorized {
-class VExprContext;
-} // namespace vectorized
-} // namespace doris
-
-using std::vector;
-
namespace doris::vectorized {
VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr,
@@ -68,13 +60,14 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile)
{
_get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
}
-Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
+Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs)
{
try {
for (const auto& supplier : input_runs) {
if (_use_sort_desc) {
- _cursors.emplace_back(supplier, _desc);
+
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier,
_desc));
} else {
- _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order,
_nulls_first);
+
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(
+ supplier, _ordering_expr, _is_asc_order,
_nulls_first));
}
}
} catch (const std::exception& e) {
@@ -82,15 +75,8 @@ Status VSortedRunMerger::prepare(const
vector<BlockSupplier>& input_runs) {
}
for (auto& _cursor : _cursors) {
- if (!_cursor._is_eof) {
- _priority_queue.push(MergeSortCursor(&_cursor));
- }
- }
-
- for (const auto& cursor : _cursors) {
- if (!cursor._is_eof) {
- _empty_block = cursor.create_empty_blocks();
- break;
+ if (!_cursor->_is_eof) {
+ _priority_queue.push(MergeSortCursor(_cursor));
}
}
@@ -145,7 +131,7 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
}
} else {
if (current->block_ptr() != nullptr) {
- for (int i = 0; i < current->all_columns.size(); i++) {
+ for (int i = 0; i < current->block->columns(); i++) {
auto& column_with_type =
current->block_ptr()->get_by_position(i);
column_with_type.column = column_with_type.column->cut(
current->pos, current->rows - current->pos);
@@ -162,9 +148,9 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
}
}
} else {
- size_t num_columns = _empty_block.columns();
- MutableBlock m_block =
- VectorizedUtils::build_mutable_mem_reuse_block(output_block,
_empty_block);
+ size_t num_columns = _priority_queue.top().impl->block->columns();
+ MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
+ output_block, *_priority_queue.top().impl->block);
MutableColumns& merged_columns = m_block.mutable_columns();
if (num_columns != merged_columns.size()) {
diff --git a/be/src/vec/runtime/vsorted_run_merger.h
b/be/src/vec/runtime/vsorted_run_merger.h
index 943956d8c38..844704fd130 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -30,9 +30,7 @@
#include "vec/core/sort_description.h"
#include "vec/exprs/vexpr_fwd.h"
-namespace doris {
-
-namespace vectorized {
+namespace doris::vectorized {
// VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is
a sorted
// sequence of blocks, which are fetched from a BlockSupplier function object.
@@ -78,14 +76,12 @@ protected:
bool _pipeline_engine_enabled = false;
- std::vector<BlockSupplierSortCursorImpl> _cursors;
+ std::vector<std::shared_ptr<BlockSupplierSortCursorImpl>> _cursors;
std::priority_queue<MergeSortCursor> _priority_queue;
/// In pipeline engine, if a cursor needs to read one more block from
supplier,
/// we make it as a pending cursor until the supplier is readable.
- MergeSortCursorImpl* _pending_cursor = nullptr;
-
- Block _empty_block;
+ std::shared_ptr<MergeSortCursorImpl> _pending_cursor = nullptr;
// Times calls to get_next().
RuntimeProfile::Counter* _get_next_timer = nullptr;
@@ -105,5 +101,4 @@ private:
bool has_next_block(MergeSortCursor& current);
};
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]