This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 974f9bdc89b [opt](sort) avoid copying whole blocks during merge
(#63429)
974f9bdc89b is described below
commit 974f9bdc89bfc7258ad124dcc6f1fecf3f4be0f8
Author: Mryange <[email protected]>
AuthorDate: Thu May 21 12:08:17 2026 +0800
[opt](sort) avoid copying whole blocks during merge (#63429)
Problem Summary: `MergeSorterState` used the generic copy-based merge
path even when the current top sorted run could return its whole
remaining block before any other run. This adds a direct whole-block
fast path guarded by a total-order check, avoiding unnecessary
`insert_range_from` work in inner merge.
### What is changed?
- Add `MergeSortCursor::totally_less_or_equals()` to detect when the
current run is wholly before the next child.
- Return the current block directly from
`MergeSorterState::_merge_sort_read_impl()` when the whole-block
condition is satisfied.
- Add focused BE unit tests for exact-batch and smaller-than-batch
whole-block fast-path cases.
---
be/src/exec/sort/sort_cursor.h | 5 +++
be/src/exec/sort/sorter.cpp | 18 ++++++++
be/test/exec/operator/sort_operator_test.cpp | 19 ++++----
be/test/exec/sort/heap_sorter_test.cpp | 16 +++----
be/test/exec/sort/merge_sorter_state.cpp | 67 ++++++++++++++++++++++++++++
5 files changed, 107 insertions(+), 18 deletions(-)
diff --git a/be/src/exec/sort/sort_cursor.h b/be/src/exec/sort/sort_cursor.h
index 0bff8bca911..a13906c2be5 100644
--- a/be/src/exec/sort/sort_cursor.h
+++ b/be/src/exec/sort/sort_cursor.h
@@ -204,6 +204,11 @@ struct MergeSortCursor {
return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0;
}
+ bool totally_less_or_equals(const MergeSortCursor& rhs) const {
+ return !impl->empty() && !rhs.impl->empty() &&
+ greater_at(rhs, impl->rows - 1, rhs.impl->pos) <= 0;
+ }
+
/// Inverted so that the priority queue elements are removed in ascending
order.
bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); }
diff --git a/be/src/exec/sort/sorter.cpp b/be/src/exec/sort/sorter.cpp
index 88160819328..616cc2145a2 100644
--- a/be/src/exec/sort/sorter.cpp
+++ b/be/src/exec/sort/sorter.cpp
@@ -94,6 +94,24 @@ Status MergeSorterState::merge_sort_read(doris::Block*
block, int batch_size, bo
}
void MergeSorterState::_merge_sort_read_impl(int batch_size, doris::Block*
block, bool* eos) {
+ if (_queue.is_valid() && batch_size > 0) {
+ auto [current, current_rows] = _queue.current();
+ current_rows = std::min(current_rows, static_cast<size_t>(batch_size));
+ const size_t step = std::min(_offset, current_rows);
+
+ // Fast path when the current top run can contribute its whole
remaining block
+ // before any other run. The returned block stays within batch_size
because
+ // is_last(current_rows) can only hold after the min(batch_size,
queue_batch_size)
+ // clamp above.
+ if (step == 0 && current->impl->is_first() &&
current->impl->is_last(current_rows) &&
+ (_queue.size() == 1 ||
(*current).totally_less_or_equals(_queue.next_child()))) {
+ current->impl->block->swap(*block);
+ _queue.remove_top();
+ *eos = false;
+ return;
+ }
+ }
+
size_t num_columns = unsorted_block()->columns();
MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block());
diff --git a/be/test/exec/operator/sort_operator_test.cpp
b/be/test/exec/operator/sort_operator_test.cpp
index 210ad1a71bd..310f3ffb4b6 100644
--- a/be/test/exec/operator/sort_operator_test.cpp
+++ b/be/test/exec/operator/sort_operator_test.cpp
@@ -193,21 +193,20 @@ TEST_F(SortOperatorTest, test_dep) {
EXPECT_TRUE(is_ready(source_local_state->dependencies()));
{
- Block block = ColumnHelper::create_block<DataTypeInt64>({});
+ MutableBlock merged_block =
ColumnHelper::create_block<DataTypeInt64>({});
bool eos = false;
- auto st = source->get_block(state.get(), &block, &eos);
- EXPECT_TRUE(st.ok()) << st.msg();
- EXPECT_FALSE(eos);
+ while (!eos) {
+ Block block;
+ auto st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_TRUE(merged_block.merge(block));
+ }
+
+ auto block = merged_block.to_block();
EXPECT_EQ(block.rows(), 6);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5, 6})));
-
- block.clear();
- st = source->get_block(state.get(), &block, &eos);
- EXPECT_TRUE(st.ok()) << st.msg();
- EXPECT_TRUE(eos);
- EXPECT_EQ(block.rows(), 0);
}
}
diff --git a/be/test/exec/sort/heap_sorter_test.cpp
b/be/test/exec/sort/heap_sorter_test.cpp
index 90b06764175..9c91db2e5e3 100644
--- a/be/test/exec/sort/heap_sorter_test.cpp
+++ b/be/test/exec/sort/heap_sorter_test.cpp
@@ -100,20 +100,20 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
EXPECT_TRUE(sorter->prepare_for_read(false));
{
- Block block;
+ MutableBlock merged_block =
ColumnHelper::create_block<DataTypeInt64>({}, {});
bool eos = false;
- EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+ while (!eos) {
+ Block block;
+ EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+ EXPECT_TRUE(merged_block.merge(block));
+ }
+
+ auto block = merged_block.to_block();
EXPECT_EQ(block.rows(), 6);
EXPECT_TRUE(ColumnHelper::block_equal(
block,
Block
{ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5, 6}),
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5, 6})}));
-
- block.clear_column_data();
-
- EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
- EXPECT_EQ(block.rows(), 0);
- EXPECT_EQ(eos, true);
}
}
diff --git a/be/test/exec/sort/merge_sorter_state.cpp
b/be/test/exec/sort/merge_sorter_state.cpp
index 0dc8a1a8937..7af89e7cbdf 100644
--- a/be/test/exec/sort/merge_sorter_state.cpp
+++ b/be/test/exec/sort/merge_sorter_state.cpp
@@ -101,4 +101,71 @@ TEST_F(MergeSorterStateTest, test1) {
ColumnHelper::create_block<DataTypeInt64>({5, 6})));
}
}
+
+TEST_F(MergeSorterStateTest, whole_block_fast_path_swaps_block) {
+ state.reset(new MergeSorterState(*row_desc, 0));
+ auto first_block = create_block({1, 2, 3});
+ auto second_block = create_block({4, 5, 6});
+ auto first_column = first_block->get_by_position(0).column;
+
+ state->add_sorted_block(first_block);
+ state->add_sorted_block(second_block);
+
+ SortDescription desc {SortColumnDescription {0, 1, -1}};
+ ASSERT_TRUE(state->build_merge_tree(desc));
+
+ Block block;
+ bool eos = false;
+ Status status = state->merge_sort_read(&block, 3, &eos);
+ ASSERT_TRUE(status.ok());
+ EXPECT_FALSE(eos);
+ EXPECT_TRUE(
+ ColumnHelper::block_equal(block,
ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+ EXPECT_EQ(block.get_by_position(0).column.get(), first_column.get());
+}
+
+TEST_F(MergeSorterStateTest, whole_block_fast_path_allows_smaller_than_batch) {
+ state.reset(new MergeSorterState(*row_desc, 0));
+ auto first_block = create_block({1, 2, 3});
+ auto second_block = create_block({4, 5, 6});
+ auto first_column = first_block->get_by_position(0).column;
+ auto second_column = second_block->get_by_position(0).column;
+
+ state->add_sorted_block(first_block);
+ state->add_sorted_block(second_block);
+
+ SortDescription desc {SortColumnDescription {0, 1, -1}};
+ ASSERT_TRUE(state->build_merge_tree(desc));
+
+ {
+ Block block;
+ bool eos = false;
+ Status status = state->merge_sort_read(&block, 4, &eos);
+ ASSERT_TRUE(status.ok());
+ EXPECT_FALSE(eos);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+ EXPECT_EQ(block.get_by_position(0).column.get(), first_column.get());
+ }
+
+ {
+ Block block;
+ bool eos = false;
+ Status status = state->merge_sort_read(&block, 4, &eos);
+ ASSERT_TRUE(status.ok());
+ EXPECT_FALSE(eos);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({4, 5, 6})));
+ EXPECT_EQ(block.get_by_position(0).column.get(), second_column.get());
+ }
+
+ {
+ Block block;
+ bool eos = false;
+ Status status = state->merge_sort_read(&block, 4, &eos);
+ ASSERT_TRUE(status.ok());
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(block.rows(), 0);
+ }
+}
} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]