This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e4af923f62a [refactor](sort merger) Refine sort merger (#48075)
e4af923f62a is described below
commit e4af923f62abfb65ef4a5da7e8a608fb23e76d19
Author: Gabriel <[email protected]>
AuthorDate: Thu Feb 20 10:10:15 2025 +0800
[refactor](sort merger) Refine sort merger (#48075)
---
be/src/pipeline/local_exchange/local_exchanger.cpp | 4 +-
be/src/vec/core/sort_cursor.h | 23 +--
be/src/vec/runtime/vsorted_run_merger.cpp | 82 +++++----
be/src/vec/runtime/vsorted_run_merger.h | 6 +-
be/test/vec/runtime/sort_merger_test.cpp | 192 ++++++++++++++++++++-
5 files changed, 241 insertions(+), 66 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 76a8a8e1274..4f4dff8b037 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -427,9 +427,9 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState*
state,
// If this block is the last block, we should block this
pipeline task to wait for
// the next block.
// TODO: LocalMergeSortExchanger should be refactored.
- if (_data_queue[id].data_queue.size_approx() == 0) {
+ if (_data_queue[id].data_queue.size_approx() == 0 && !*eos) {
std::unique_lock l(*_m[id]);
- if (_data_queue[id].data_queue.size_approx() == 0) {
+ if (_data_queue[id].data_queue.size_approx() == 0 &&
!*eos) {
local_state->get_dependency(id)->block();
}
}
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 72aa4cbb4dc..3c833ec9c24 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -171,8 +171,9 @@ struct MergeSortCursorImpl {
void next(size_t size = 1) { pos += size; }
size_t get_size() const { return rows; }
- virtual bool has_next_block() { return false; }
+ virtual void process_next() {}
virtual Block* block_ptr() { return nullptr; }
+ virtual bool eof() const { return false; }
};
using BlockSupplier = std::function<Status(Block*, bool* eos)>;
@@ -192,38 +193,32 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
desc[i].direction = is_asc_order[i] ? 1 : -1;
desc[i].nulls_direction = nulls_first[i] ? -desc[i].direction :
desc[i].direction;
}
- has_next_block();
+ process_next();
}
BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const
SortDescription& desc_)
: MergeSortCursorImpl(desc_),
_block_supplier(std::move(block_supplier)) {
- has_next_block();
+ process_next();
}
- bool has_next_block() override {
+ void process_next() override {
if (_is_eof) {
- return false;
+ return;
}
block->clear();
THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof));
- DCHECK(!block->empty() xor _is_eof);
+ DCHECK(!block->empty() or _is_eof);
if (!block->empty()) {
DCHECK_EQ(_ordering_expr.size(), desc.size());
for (int i = 0; i < desc.size(); ++i) {
THROW_IF_ERROR(_ordering_expr[i]->execute(block.get(),
&desc[i].column_number));
}
MergeSortCursorImpl::reset();
- return true;
}
- return false;
}
- Block* block_ptr() override {
- if (_is_eof) {
- return nullptr;
- }
- return block.get();
- }
+ Block* block_ptr() override { return block.get(); }
+ bool eof() const override { return is_last() && _is_eof; }
VExprContextSPtrs _ordering_expr;
BlockSupplier _block_supplier {};
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp
b/be/src/vec/runtime/vsorted_run_merger.cpp
index a145c216bbb..16150cab63f 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -76,9 +76,9 @@ Status VSortedRunMerger::prepare(const
std::vector<BlockSupplier>& input_runs) {
return Status::Cancelled(e.what());
}
- for (auto& _cursor : _cursors) {
- if (!_cursor->_is_eof) {
- _priority_queue.push(MergeSortCursor(_cursor));
+ for (auto& cursor : _cursors) {
+ if (!cursor->eof()) {
+ _priority_queue.push(MergeSortCursor(cursor));
}
}
@@ -93,7 +93,11 @@ Status VSortedRunMerger::get_next(Block* output_block, bool*
eos) {
if (_pending_cursor != nullptr) {
MergeSortCursor cursor(_pending_cursor);
- if (has_next_block(cursor)) {
+ {
+ ScopedTimer<MonotonicStopWatch> timer1(_get_next_block_timer);
+ cursor->process_next();
+ }
+ if (!cursor->eof()) {
_priority_queue.push(cursor);
}
_pending_cursor = nullptr;
@@ -112,42 +116,39 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
return Status::OK();
} else if (_priority_queue.size() == 1) {
auto current = _priority_queue.top();
- while (_offset != 0 && current->block_ptr() != nullptr) {
- if (_offset >= current->rows - current->pos) {
- _offset -= (current->rows - current->pos);
- _pending_cursor = current.impl;
+ DCHECK(!current->eof());
+ DCHECK(current->block_ptr() != nullptr);
+ while (_offset != 0) {
+ auto process_rows = std::min(current->rows - current->pos,
_offset);
+ current->next(process_rows);
+ _offset -= process_rows;
+ if (current->is_last(0)) {
_priority_queue.pop();
+ if (current->eof()) {
+ *eos = true;
+ } else {
+ _pending_cursor = current.impl;
+ }
return Status::OK();
- } else {
- current->pos += _offset;
- _offset = 0;
}
}
- if (current->is_first()) {
- if (current->block_ptr() != nullptr) {
- current->block_ptr()->swap(*output_block);
- _pending_cursor = current.impl;
- _priority_queue.pop();
- return Status::OK();
- } else {
- *eos = true;
+ if (!current->is_first()) {
+ for (int i = 0; i < current->block->columns(); i++) {
+ auto& column_with_type =
current->block_ptr()->get_by_position(i);
+ column_with_type.column =
+ column_with_type.column->cut(current->pos,
current->rows - current->pos);
}
+ }
+ current->block_ptr()->swap(*output_block);
+ current->next(current->rows - current->pos);
+ if (current->eof()) {
+ *eos = true;
} else {
- if (current->block_ptr() != nullptr) {
- for (int i = 0; i < current->block->columns(); i++) {
- auto& column_with_type =
current->block_ptr()->get_by_position(i);
- column_with_type.column = column_with_type.column->cut(
- current->pos, current->rows - current->pos);
- }
- current->block_ptr()->swap(*output_block);
- _pending_cursor = current.impl;
- _priority_queue.pop();
- return Status::OK();
- } else {
- *eos = true;
- }
+ _pending_cursor = current.impl;
}
+ _priority_queue.pop();
+ return Status::OK();
} else {
size_t num_columns = _priority_queue.top().impl->block->columns();
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
@@ -191,7 +192,7 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
++merged_rows;
}
- if (!next_heap(current)) {
+ if (_need_more_data(current)) {
do_insert();
return Status::OK();
}
@@ -208,20 +209,17 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
return Status::OK();
}
-bool VSortedRunMerger::next_heap(MergeSortCursor& current) {
+bool VSortedRunMerger::_need_more_data(MergeSortCursor& current) {
if (!current->is_last()) {
current->next();
_priority_queue.push(current);
+ return false;
+ } else if (current->eof()) {
+ return false;
+ } else {
+ _pending_cursor = current.impl;
return true;
}
-
- _pending_cursor = current.impl;
- return false;
-}
-
-inline bool
VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) {
- ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
- return current->has_next_block();
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/runtime/vsorted_run_merger.h
b/be/src/vec/runtime/vsorted_run_merger.h
index 898b52a8601..d44a5b59dbd 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -92,10 +92,8 @@ protected:
private:
void init_timers(RuntimeProfile* profile);
-
- /// In pipeline engine, return false if need to read one more block from
sender.
- bool next_heap(MergeSortCursor& current);
- bool has_next_block(MergeSortCursor& current);
+ // If current stream is exhausted and not eof, we should break this loop
and read more blocks.
+ bool _need_more_data(MergeSortCursor& current);
};
} // namespace doris::vectorized
diff --git a/be/test/vec/runtime/sort_merger_test.cpp
b/be/test/vec/runtime/sort_merger_test.cpp
index a36b436deec..dece7b31074 100644
--- a/be/test/vec/runtime/sort_merger_test.cpp
+++ b/be/test/vec/runtime/sort_merger_test.cpp
@@ -35,7 +35,12 @@ public:
TEST(SortMergerTest, NULL_FIRST_ASC) {
/**
- * in: [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL,
1, 2, 3, 4], [NULL, 1, 2, 3, 4]
+ * in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * offset = 0, limit = -1, NULL_FIRST, ASC
* out: [NULL, NULL, NULL, NULL, NULL], [1, 1, 1, 1, 1], [2, 2, 2, 2, 2],
[3, 3, 3, 3, 3], [4], [4], [4], [4], [4]
*/
const int num_children = 5;
@@ -117,7 +122,12 @@ TEST(SortMergerTest, NULL_FIRST_ASC) {
TEST(SortMergerTest, NULL_LAST_DESC) {
/**
- * in: [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL], [4, 3,
2, 1, NULL], [4, 3, 2, 1, NULL]
+ * in: [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+ * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+ * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+ * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+ * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+ * offset = 0, limit = -1, NULL_LAST, DESC
* out: [4, 4, 4, 4, 4], [3, 3, 3, 3, 3], [2, 2, 2, 2, 2], [1, 1, 1, 1,
1], [NULL], [NULL], [NULL], [NULL], [NULL]
*/
const int num_children = 5;
@@ -195,8 +205,12 @@ TEST(SortMergerTest, NULL_LAST_DESC) {
TEST(SortMergerTest, TEST_LIMIT) {
/**
- * in: [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL,
1, 2, 3, 4], [NULL, 1, 2, 3, 4]
- * offset = 20, limit = 1
+ * in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+ * offset = 20, limit = 1, NULL_FIRST, ASC
* out: [4]
*/
const int num_children = 5;
@@ -249,4 +263,174 @@ TEST(SortMergerTest, TEST_LIMIT) {
}
}
+TEST(SortMergerTest, LAST_BLOCK_WITH_EOS) {
+ /**
+ * in: [([NULL, 0, 1, 2, 3], eos = true)]
+ * [([NULL, 0, 1, 2, 3], eos = true)]
+ * [([NULL, 0, 1, 2, 3], eos = true)]
+ * [([NULL, 0, 1, 2, 3], eos = true)]
+ * [([NULL, 0, 1, 2, 3], eos = true)]
+ * offset = 0, limit = -1, NULL_FIRST, ASC
+ * out: [NULL, NULL, NULL, NULL, NULL], [0, 0, 0, 0, 0], [1, 1, 1, 1, 1],
[2, 2, 2, 2, 2], [3, 3, 3, 3, 3]
+ */
+ const int num_children = 5;
+ const int batch_size = 5;
+ std::vector<int> round;
+ round.resize(num_children, 0);
+ const int num_round = 1;
+
+ std::unique_ptr<VSortedRunMerger> merger;
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto ordering_expr = MockSlotRef::create_mock_contexts(
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
+ {
+ std::vector<bool> is_asc_order = {true};
+ std::vector<bool> nulls_first = {true};
+ const int limit = -1;
+ const int offset = 0;
+ merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order,
nulls_first, batch_size,
+ limit, offset, profile.get()));
+ }
+ {
+ std::vector<vectorized::BlockSupplier> child_block_suppliers;
+ for (int child_idx = 0; child_idx < num_children; child_idx++) {
+ vectorized::BlockSupplier block_supplier =
+ [&, round_vec = &round, num_round = num_round, id =
child_idx](
+ vectorized::Block* block, bool* eos) {
+ *block =
ColumnHelper::create_nullable_block<DataTypeInt64>(
+ {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1,
+ (*round_vec)[id] + 2, (*round_vec)[id] + 3},
+ {1, 0, 0, 0, 0});
+ *eos = ++((*round_vec)[id]) == num_round;
+ return Status::OK();
+ };
+ child_block_suppliers.push_back(block_supplier);
+ }
+ EXPECT_TRUE(merger->prepare(child_block_suppliers).ok());
+ }
+ {
+ for (int block_idx = 0; block_idx < num_children * num_round;
block_idx++) {
+ vectorized::Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ auto expect_block = block_idx == 0
+ ?
ColumnHelper::create_nullable_column<DataTypeInt64>(
+ {0, 0, 0, 0, 0}, {1, 1, 1,
1, 1})
+ :
ColumnHelper::create_nullable_column<DataTypeInt64>(
+ {block_idx - 1, block_idx -
1, block_idx - 1,
+ block_idx - 1, block_idx -
1},
+ {0, 0, 0, 0, 0});
+
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column,
expect_block));
+ EXPECT_EQ(block.rows(), batch_size);
+ EXPECT_FALSE(eos);
+ }
+ vectorized::Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ EXPECT_EQ(block.rows(), 0);
+ EXPECT_TRUE(eos);
+ }
+}
+
+TEST(SortMergerTest, TEST_BIG_OFFSET_SINGLE_STREAM) {
+ /**
+ * in: [([NULL, 0, 1, 2, 3], eos = true)]
+ * offset = 20, limit = 1, NULL_FIRST, ASC
+ * out: []
+ */
+ const int num_children = 1;
+ const int batch_size = 5;
+ std::vector<int> round;
+ round.resize(num_children, 0);
+ const int num_round = 1;
+
+ std::unique_ptr<VSortedRunMerger> merger;
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto ordering_expr = MockSlotRef::create_mock_contexts(
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
+ {
+ std::vector<bool> is_asc_order = {true};
+ std::vector<bool> nulls_first = {true};
+ const int limit = 1;
+ const int offset = 20;
+ merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order,
nulls_first, batch_size,
+ limit, offset, profile.get()));
+ }
+ {
+ std::vector<vectorized::BlockSupplier> child_block_suppliers;
+ for (int child_idx = 0; child_idx < num_children; child_idx++) {
+ vectorized::BlockSupplier block_supplier =
+ [&, round_vec = &round, num_round = num_round, id =
child_idx](
+ vectorized::Block* block, bool* eos) {
+ *block =
ColumnHelper::create_nullable_block<DataTypeInt64>(
+ {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1,
+ (*round_vec)[id] + 2, (*round_vec)[id] + 3},
+ {1, 0, 0, 0, 0});
+ *eos = ++((*round_vec)[id]) == num_round;
+ return Status::OK();
+ };
+ child_block_suppliers.push_back(block_supplier);
+ }
+ EXPECT_TRUE(merger->prepare(child_block_suppliers).ok());
+ }
+ {
+ vectorized::Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ EXPECT_EQ(block.rows(), 0);
+ EXPECT_TRUE(eos);
+ }
+}
+
+TEST(SortMergerTest, TEST_SMALL_OFFSET_SINGLE_STREAM) {
+ /**
+ * in: [([NULL, 0, 1, 2, 3], eos = true)]
+ * offset = 4, limit = 1, NULL_FIRST, ASC
+ * out: [3]
+ */
+ const int num_children = 1;
+ const int batch_size = 5;
+ std::vector<int> round;
+ round.resize(num_children, 0);
+ const int num_round = 1;
+
+ std::unique_ptr<VSortedRunMerger> merger;
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto ordering_expr = MockSlotRef::create_mock_contexts(
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
+ {
+ std::vector<bool> is_asc_order = {true};
+ std::vector<bool> nulls_first = {true};
+ const int limit = 1;
+ const int offset = 4;
+ merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order,
nulls_first, batch_size,
+ limit, offset, profile.get()));
+ }
+ {
+ std::vector<vectorized::BlockSupplier> child_block_suppliers;
+ for (int child_idx = 0; child_idx < num_children; child_idx++) {
+ vectorized::BlockSupplier block_supplier =
+ [&, round_vec = &round, num_round = num_round, id =
child_idx](
+ vectorized::Block* block, bool* eos) {
+ *block =
ColumnHelper::create_nullable_block<DataTypeInt64>(
+ {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1,
+ (*round_vec)[id] + 2, (*round_vec)[id] + 3},
+ {1, 0, 0, 0, 0});
+ *eos = ++((*round_vec)[id]) == num_round;
+ return Status::OK();
+ };
+ child_block_suppliers.push_back(block_supplier);
+ }
+ EXPECT_TRUE(merger->prepare(child_block_suppliers).ok());
+ }
+ {
+ vectorized::Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ auto expect_block =
ColumnHelper::create_nullable_column<DataTypeInt64>({3}, {0});
+
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column,
expect_block));
+ EXPECT_TRUE(eos);
+ }
+}
+
} // namespace doris::vectorized
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]