This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 974f9bdc89b [opt](sort) avoid copying whole blocks during merge 
(#63429)
974f9bdc89b is described below

commit 974f9bdc89bfc7258ad124dcc6f1fecf3f4be0f8
Author: Mryange <[email protected]>
AuthorDate: Thu May 21 12:08:17 2026 +0800

    [opt](sort) avoid copying whole blocks during merge (#63429)
    
    Problem Summary: `MergeSorterState` used the generic copy-based merge
    path even when the current top sorted run could return its whole
    remaining block before any other run. This adds a direct whole-block
    fast path guarded by a total-order check, avoiding unnecessary
    `insert_range_from` work in inner merge.
    
    ### What is changed?
    
    - Add `MergeSortCursor::totally_less_or_equals()` to detect when the
    current run is wholly before the next child.
    - Return the current block directly from
    `MergeSorterState::_merge_sort_read_impl()` when the whole-block
    condition is satisfied.
    - Add focused BE unit tests for exact-batch and smaller-than-batch
    whole-block fast-path cases.
---
 be/src/exec/sort/sort_cursor.h               |  5 +++
 be/src/exec/sort/sorter.cpp                  | 18 ++++++++
 be/test/exec/operator/sort_operator_test.cpp | 19 ++++----
 be/test/exec/sort/heap_sorter_test.cpp       | 16 +++----
 be/test/exec/sort/merge_sorter_state.cpp     | 67 ++++++++++++++++++++++++++++
 5 files changed, 107 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/sort/sort_cursor.h b/be/src/exec/sort/sort_cursor.h
index 0bff8bca911..a13906c2be5 100644
--- a/be/src/exec/sort/sort_cursor.h
+++ b/be/src/exec/sort/sort_cursor.h
@@ -204,6 +204,11 @@ struct MergeSortCursor {
         return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0;
     }
 
+    bool totally_less_or_equals(const MergeSortCursor& rhs) const {
+        return !impl->empty() && !rhs.impl->empty() &&
+               greater_at(rhs, impl->rows - 1, rhs.impl->pos) <= 0;
+    }
+
     /// Inverted so that the priority queue elements are removed in ascending 
order.
     bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); }
 
diff --git a/be/src/exec/sort/sorter.cpp b/be/src/exec/sort/sorter.cpp
index 88160819328..616cc2145a2 100644
--- a/be/src/exec/sort/sorter.cpp
+++ b/be/src/exec/sort/sorter.cpp
@@ -94,6 +94,24 @@ Status MergeSorterState::merge_sort_read(doris::Block* 
block, int batch_size, bo
 }
 
 void MergeSorterState::_merge_sort_read_impl(int batch_size, doris::Block* 
block, bool* eos) {
+    if (_queue.is_valid() && batch_size > 0) {
+        auto [current, current_rows] = _queue.current();
+        current_rows = std::min(current_rows, static_cast<size_t>(batch_size));
+        const size_t step = std::min(_offset, current_rows);
+
+        // Fast path when the current top run can contribute its whole 
remaining block
+        // before any other run. The returned block stays within batch_size 
because
+        // is_last(current_rows) can only hold after the min(batch_size, 
queue_batch_size)
+        // clamp above.
+        if (step == 0 && current->impl->is_first() && 
current->impl->is_last(current_rows) &&
+            (_queue.size() == 1 || 
(*current).totally_less_or_equals(_queue.next_child()))) {
+            current->impl->block->swap(*block);
+            _queue.remove_top();
+            *eos = false;
+            return;
+        }
+    }
+
     size_t num_columns = unsorted_block()->columns();
 
     MutableBlock m_block = 
VectorizedUtils::build_mutable_mem_reuse_block(block, *unsorted_block());
diff --git a/be/test/exec/operator/sort_operator_test.cpp 
b/be/test/exec/operator/sort_operator_test.cpp
index 210ad1a71bd..310f3ffb4b6 100644
--- a/be/test/exec/operator/sort_operator_test.cpp
+++ b/be/test/exec/operator/sort_operator_test.cpp
@@ -193,21 +193,20 @@ TEST_F(SortOperatorTest, test_dep) {
     EXPECT_TRUE(is_ready(source_local_state->dependencies()));
 
     {
-        Block block = ColumnHelper::create_block<DataTypeInt64>({});
+        MutableBlock merged_block = 
ColumnHelper::create_block<DataTypeInt64>({});
         bool eos = false;
-        auto st = source->get_block(state.get(), &block, &eos);
-        EXPECT_TRUE(st.ok()) << st.msg();
-        EXPECT_FALSE(eos);
+        while (!eos) {
+            Block block;
+            auto st = source->get_block(state.get(), &block, &eos);
+            EXPECT_TRUE(st.ok()) << st.msg();
+            EXPECT_TRUE(merged_block.merge(block));
+        }
+
+        auto block = merged_block.to_block();
         EXPECT_EQ(block.rows(), 6);
         std::cout << block.dump_data() << std::endl;
         EXPECT_TRUE(ColumnHelper::block_equal(
                 block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5, 6})));
-
-        block.clear();
-        st = source->get_block(state.get(), &block, &eos);
-        EXPECT_TRUE(st.ok()) << st.msg();
-        EXPECT_TRUE(eos);
-        EXPECT_EQ(block.rows(), 0);
     }
 }
 
diff --git a/be/test/exec/sort/heap_sorter_test.cpp 
b/be/test/exec/sort/heap_sorter_test.cpp
index 90b06764175..9c91db2e5e3 100644
--- a/be/test/exec/sort/heap_sorter_test.cpp
+++ b/be/test/exec/sort/heap_sorter_test.cpp
@@ -100,20 +100,20 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
     EXPECT_TRUE(sorter->prepare_for_read(false));
 
     {
-        Block block;
+        MutableBlock merged_block = 
ColumnHelper::create_block<DataTypeInt64>({}, {});
         bool eos = false;
-        EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+        while (!eos) {
+            Block block;
+            EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+            EXPECT_TRUE(merged_block.merge(block));
+        }
+
+        auto block = merged_block.to_block();
         EXPECT_EQ(block.rows(), 6);
         EXPECT_TRUE(ColumnHelper::block_equal(
                 block,
                 Block 
{ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5, 6}),
                        
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5, 6})}));
-
-        block.clear_column_data();
-
-        EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
-        EXPECT_EQ(block.rows(), 0);
-        EXPECT_EQ(eos, true);
     }
 }
 
diff --git a/be/test/exec/sort/merge_sorter_state.cpp 
b/be/test/exec/sort/merge_sorter_state.cpp
index 0dc8a1a8937..7af89e7cbdf 100644
--- a/be/test/exec/sort/merge_sorter_state.cpp
+++ b/be/test/exec/sort/merge_sorter_state.cpp
@@ -101,4 +101,71 @@ TEST_F(MergeSorterStateTest, test1) {
                                               
ColumnHelper::create_block<DataTypeInt64>({5, 6})));
     }
 }
+
+TEST_F(MergeSorterStateTest, whole_block_fast_path_swaps_block) {
+    state.reset(new MergeSorterState(*row_desc, 0));
+    auto first_block = create_block({1, 2, 3});
+    auto second_block = create_block({4, 5, 6});
+    auto first_column = first_block->get_by_position(0).column;
+
+    state->add_sorted_block(first_block);
+    state->add_sorted_block(second_block);
+
+    SortDescription desc {SortColumnDescription {0, 1, -1}};
+    ASSERT_TRUE(state->build_merge_tree(desc));
+
+    Block block;
+    bool eos = false;
+    Status status = state->merge_sort_read(&block, 3, &eos);
+    ASSERT_TRUE(status.ok());
+    EXPECT_FALSE(eos);
+    EXPECT_TRUE(
+            ColumnHelper::block_equal(block, 
ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+    EXPECT_EQ(block.get_by_position(0).column.get(), first_column.get());
+}
+
+TEST_F(MergeSorterStateTest, whole_block_fast_path_allows_smaller_than_batch) {
+    state.reset(new MergeSorterState(*row_desc, 0));
+    auto first_block = create_block({1, 2, 3});
+    auto second_block = create_block({4, 5, 6});
+    auto first_column = first_block->get_by_position(0).column;
+    auto second_column = second_block->get_by_position(0).column;
+
+    state->add_sorted_block(first_block);
+    state->add_sorted_block(second_block);
+
+    SortDescription desc {SortColumnDescription {0, 1, -1}};
+    ASSERT_TRUE(state->build_merge_tree(desc));
+
+    {
+        Block block;
+        bool eos = false;
+        Status status = state->merge_sort_read(&block, 4, &eos);
+        ASSERT_TRUE(status.ok());
+        EXPECT_FALSE(eos);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+        EXPECT_EQ(block.get_by_position(0).column.get(), first_column.get());
+    }
+
+    {
+        Block block;
+        bool eos = false;
+        Status status = state->merge_sort_read(&block, 4, &eos);
+        ASSERT_TRUE(status.ok());
+        EXPECT_FALSE(eos);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({4, 5, 6})));
+        EXPECT_EQ(block.get_by_position(0).column.get(), second_column.get());
+    }
+
+    {
+        Block block;
+        bool eos = false;
+        Status status = state->merge_sort_read(&block, 4, &eos);
+        ASSERT_TRUE(status.ok());
+        EXPECT_TRUE(eos);
+        EXPECT_EQ(block.rows(), 0);
+    }
+}
 } // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to