This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 24b37cf43fc [Bug](spill) fix wrong offset process on spill sort
(#53672)
24b37cf43fc is described below
commit 24b37cf43fcdfb752b1004cfad3241712222927b
Author: Pxl <[email protected]>
AuthorDate: Mon Jul 28 14:27:41 2025 +0800
[Bug](spill) fix wrong offset process on spill sort (#53672)
fix wrong offset process on spill sort
---
be/src/pipeline/common/partition_sort_utils.cpp | 2 +-
be/src/pipeline/dependency.h | 2 +
.../pipeline/exec/partition_sort_sink_operator.cpp | 2 +-
be/src/pipeline/exec/sort_sink_operator.cpp | 4 +-
be/src/pipeline/exec/sort_sink_operator.h | 3 ++
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 7 +++-
.../pipeline/exec/spill_sort_source_operator.cpp | 14 +++++--
be/src/vec/common/sort/heap_sorter.cpp | 5 ++-
be/src/vec/common/sort/heap_sorter.h | 2 +-
be/src/vec/common/sort/partition_sorter.cpp | 5 ++-
be/src/vec/common/sort/partition_sorter.h | 2 +-
be/src/vec/common/sort/sorter.cpp | 7 +++-
be/src/vec/common/sort/sorter.h | 6 ++-
be/src/vec/common/sort/topn_sorter.cpp | 5 ++-
be/src/vec/common/sort/topn_sorter.h | 2 +-
be/test/testutil/mock/mock_sorter.h | 2 +-
be/test/vec/exec/sort/heap_sorter_test.cpp | 2 +-
be/test/vec/exec/sort/partition_sorter_test.cpp | 6 +--
be/test/vec/exec/sort/sort_test.cpp | 2 +-
.../data/query_p0/sort_spill/sort_spill.out | Bin 0 -> 280 bytes
.../suites/query_p0/sort_spill/sort_spill.groovy | 41 +++++++++++++++++++++
21 files changed, 97 insertions(+), 24 deletions(-)
diff --git a/be/src/pipeline/common/partition_sort_utils.cpp
b/be/src/pipeline/common/partition_sort_utils.cpp
index b6fdbd5915e..cf6b20048db 100644
--- a/be/src/pipeline/common/partition_sort_utils.cpp
+++ b/be/src/pipeline/common/partition_sort_utils.cpp
@@ -75,7 +75,7 @@ Status PartitionBlocks::do_partition_topn_sort() {
RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get()));
}
_blocks.clear();
- RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read());
+ RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read(false));
bool current_eos = false;
while (!current_eos) {
// output_block maybe need better way
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index eec06b115cc..3a3f497330c 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -529,6 +529,8 @@ struct SpillSortSharedState : public BasicSharedState,
SortSharedState* in_mem_shared_state = nullptr;
bool enable_spill = false;
bool is_spilled = false;
+ int64_t limit = -1;
+ int64_t offset = 0;
std::atomic_bool is_closed = false;
std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index ea3b94d4320..cc759ad0547 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -152,7 +152,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
RETURN_IF_ERROR(sorter->append_block(block.get()));
}
local_state._value_places[i]->_blocks.clear();
- RETURN_IF_ERROR(sorter->prepare_for_read());
+ RETURN_IF_ERROR(sorter->prepare_for_read(false));
INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> lc(
local_state._shared_state->prepared_finish_lock));
sorter->set_prepared_finish();
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 8b72bf6b9d1..2a7b329a10b 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -160,7 +160,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* in
}
if (eos) {
- RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read());
+
RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read(false));
local_state._dependency->set_ready_to_read();
}
return Status::OK();
@@ -178,7 +178,7 @@ size_t
SortSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const {
Status SortSinkOperatorX::prepare_for_spill(RuntimeState* state) {
auto& local_state = get_local_state(state);
- return local_state._shared_state->sorter->prepare_for_read();
+ return local_state._shared_state->sorter->prepare_for_read(true);
}
Status SortSinkOperatorX::merge_sort_read_for_spill(RuntimeState* state,
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index be37cef07dc..a8d1e9005b2 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -101,6 +101,9 @@ public:
int batch_size, bool* eos);
void reset(RuntimeState* state);
+ int64_t limit() const { return _limit; }
+ int64_t offset() const { return _offset; }
+
private:
friend class SortSinkLocalState;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index d6ba8ec6414..2a10baaf093 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -161,7 +161,8 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
}
} else {
RETURN_IF_ERROR(
-
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
+
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read(
+ false));
local_state._dependency->set_ready_to_read();
}
}
@@ -176,8 +177,11 @@ size_t
SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
const
std::shared_ptr<SpillContext>& spill_context) {
+ auto& parent = Base::_parent->template cast<Parent>();
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
+ _shared_state->limit = parent._sort_sink_operator->limit();
+ _shared_state->offset = parent._sort_sink_operator->offset();
custom_profile()->add_info_string("Spilled", "true");
}
@@ -193,7 +197,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
_shared_state->sorted_streams.emplace_back(_spilling_stream);
- auto& parent = Base::_parent->template cast<Parent>();
auto query_id = state->query_id();
auto spill_func = [this, state, query_id, &parent] {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index de28bf60305..550e7789346 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -19,6 +19,8 @@
#include <glog/logging.h>
+#include <cstdint>
+
#include "common/status.h"
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
@@ -179,10 +181,16 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
Status SpillSortLocalState::_create_intermediate_merger(
int num_blocks, const vectorized::SortDescription& sort_description) {
std::vector<vectorized::BlockSupplier> child_block_suppliers;
+ int64_t limit = -1;
+ int64_t offset = 0;
+ if (num_blocks >= _shared_state->sorted_streams.size()) {
+ // final round use real limit and offset
+ limit = Base::_shared_state->limit;
+ offset = Base::_shared_state->offset;
+ }
+
_merger = std::make_unique<vectorized::VSortedRunMerger>(
- sort_description, _runtime_state->batch_size(),
- Base::_shared_state->in_mem_shared_state->sorter->limit(),
- Base::_shared_state->in_mem_shared_state->sorter->offset(),
custom_profile());
+ sort_description, _runtime_state->batch_size(), limit, offset,
custom_profile());
_current_merging_streams.clear();
for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty();
++i) {
diff --git a/be/src/vec/common/sort/heap_sorter.cpp
b/be/src/vec/common/sort/heap_sorter.cpp
index ec12e61ebf2..c1b6a735afd 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -51,7 +51,10 @@ Status HeapSorter::append_block(Block* block) {
return Status::OK();
}
-Status HeapSorter::prepare_for_read() {
+Status HeapSorter::prepare_for_read(bool is_spill) {
+ if (is_spill) {
+ return Status::InternalError("HeapSorter does not support spill");
+ }
while (_queue.is_valid()) {
auto [current, current_rows] = _queue.current();
if (current_rows) {
diff --git a/be/src/vec/common/sort/heap_sorter.h
b/be/src/vec/common/sort/heap_sorter.h
index 51b14ff0f1b..35108cbafc2 100644
--- a/be/src/vec/common/sort/heap_sorter.h
+++ b/be/src/vec/common/sort/heap_sorter.h
@@ -34,7 +34,7 @@ public:
Status append_block(Block* block) override;
- Status prepare_for_read() override;
+ Status prepare_for_read(bool is_spill) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
diff --git a/be/src/vec/common/sort/partition_sorter.cpp
b/be/src/vec/common/sort/partition_sorter.cpp
index 0e22bf2fc21..305a803c9e0 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -63,7 +63,10 @@ Status PartitionSorter::append_block(Block* input_block) {
return Status::OK();
}
-Status PartitionSorter::prepare_for_read() {
+Status PartitionSorter::prepare_for_read(bool is_spill) {
+ if (is_spill) {
+ return Status::InternalError("PartitionSorter does not support spill");
+ }
auto& blocks = _state->get_sorted_block();
auto& queue = _state->get_queue();
std::vector<MergeSortCursor> cursors;
diff --git a/be/src/vec/common/sort/partition_sorter.h
b/be/src/vec/common/sort/partition_sorter.h
index d20ea1bd220..e7d3f37941f 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -88,7 +88,7 @@ public:
Status append_block(Block* block) override;
- Status prepare_for_read() override;
+ Status prepare_for_read(bool is_spill) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 4901b4d77b0..951281f1383 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -262,7 +262,12 @@ Status FullSorter::append_block(Block* block) {
return Status::OK();
}
-Status FullSorter::prepare_for_read() {
+Status FullSorter::prepare_for_read(bool is_spill) {
+ if (is_spill) {
+ _limit += _offset;
+ _offset = 0;
+ _state->ignore_offset();
+ }
if (_state->unsorted_block()->rows() > 0) {
RETURN_IF_ERROR(_do_sort());
}
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index ca33a9eacfa..149939b9bd9 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -83,6 +83,8 @@ public:
std::unique_ptr<Block>& unsorted_block() { return _unsorted_block; }
+ void ignore_offset() { _offset = 0; }
+
private:
void _merge_sort_read_impl(int batch_size, doris::vectorized::Block*
block, bool* eos);
@@ -129,7 +131,7 @@ public:
virtual Status append_block(Block* block) = 0;
- virtual Status prepare_for_read() = 0;
+ virtual Status prepare_for_read(bool is_spill) = 0;
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0;
@@ -182,7 +184,7 @@ public:
Status append_block(Block* block) override;
- Status prepare_for_read() override;
+ Status prepare_for_read(bool is_spill) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
diff --git a/be/src/vec/common/sort/topn_sorter.cpp
b/be/src/vec/common/sort/topn_sorter.cpp
index fe3cecca5cd..daacd064118 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -53,7 +53,10 @@ Status TopNSorter::append_block(Block* block) {
return Status::OK();
}
-Status TopNSorter::prepare_for_read() {
+Status TopNSorter::prepare_for_read(bool is_spill) {
+ if (is_spill) {
+ return Status::InternalError("TopN sorter does not support spill");
+ }
return _state->build_merge_tree(_sort_description);
}
diff --git a/be/src/vec/common/sort/topn_sorter.h
b/be/src/vec/common/sort/topn_sorter.h
index 54a2e838ffc..80e280cd802 100644
--- a/be/src/vec/common/sort/topn_sorter.h
+++ b/be/src/vec/common/sort/topn_sorter.h
@@ -52,7 +52,7 @@ public:
Status append_block(Block* block) override;
- Status prepare_for_read() override;
+ Status prepare_for_read(bool is_spill) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
diff --git a/be/test/testutil/mock/mock_sorter.h
b/be/test/testutil/mock/mock_sorter.h
index 8b31484741c..30e87f741ca 100644
--- a/be/test/testutil/mock/mock_sorter.h
+++ b/be/test/testutil/mock/mock_sorter.h
@@ -24,7 +24,7 @@ struct MockSorter : public Sorter {
MockSorter() = default;
Status append_block(Block* block) override { return Status::OK(); }
- Status prepare_for_read() override { return Status::OK(); }
+ Status prepare_for_read(bool is_spill) override { return Status::OK(); }
Status get_next(RuntimeState* state, Block* block, bool* eos) override {
*eos = true;
diff --git a/be/test/vec/exec/sort/heap_sorter_test.cpp
b/be/test/vec/exec/sort/heap_sorter_test.cpp
index ed83368c294..14be58b1618 100644
--- a/be/test/vec/exec/sort/heap_sorter_test.cpp
+++ b/be/test/vec/exec/sort/heap_sorter_test.cpp
@@ -111,7 +111,7 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
EXPECT_EQ(value, real);
}
- EXPECT_TRUE(sorter->prepare_for_read());
+ EXPECT_TRUE(sorter->prepare_for_read(false));
{
Block block;
diff --git a/be/test/vec/exec/sort/partition_sorter_test.cpp
b/be/test/vec/exec/sort/partition_sorter_test.cpp
index 00baa50826a..9c8fad5ff47 100644
--- a/be/test/vec/exec/sort/partition_sorter_test.cpp
+++ b/be/test/vec/exec/sort/partition_sorter_test.cpp
@@ -94,7 +94,7 @@ TEST_F(PartitionSorterTest,
test_partition_sorter_read_row_num) {
}
{
- auto st = sorter->prepare_for_read();
+ auto st = sorter->prepare_for_read(false);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
@@ -140,7 +140,7 @@ TEST_F(PartitionSorterTest,
test_partition_sorter_DENSE_RANK) {
}
{
- auto st = sorter->prepare_for_read();
+ auto st = sorter->prepare_for_read(false);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
@@ -179,7 +179,7 @@ TEST_F(PartitionSorterTest, test_partition_sorter_RANK) {
}
{
- auto st = sorter->prepare_for_read();
+ auto st = sorter->prepare_for_read(false);
EXPECT_TRUE(st.ok()) << st.msg();
}
{
diff --git a/be/test/vec/exec/sort/sort_test.cpp
b/be/test/vec/exec/sort/sort_test.cpp
index 505c0076174..ceea6bb3bf6 100644
--- a/be/test/vec/exec/sort/sort_test.cpp
+++ b/be/test/vec/exec/sort/sort_test.cpp
@@ -87,7 +87,7 @@ public:
EXPECT_TRUE(sorter->append_block(&block).ok());
}
- void prepare_for_read() { EXPECT_TRUE(sorter->prepare_for_read().ok()); }
+ void prepare_for_read() {
EXPECT_TRUE(sorter->prepare_for_read(false).ok()); }
void check_sort_column(ColumnPtr column) {
MutableBlock
sorted_block(VectorizedUtils::create_columns_with_type_and_name(*row_desc));
diff --git a/regression-test/data/query_p0/sort_spill/sort_spill.out
b/regression-test/data/query_p0/sort_spill/sort_spill.out
new file mode 100644
index 00000000000..00c0dd6f19a
Binary files /dev/null and
b/regression-test/data/query_p0/sort_spill/sort_spill.out differ
diff --git a/regression-test/suites/query_p0/sort_spill/sort_spill.groovy
b/regression-test/suites/query_p0/sort_spill/sort_spill.groovy
new file mode 100644
index 00000000000..13f7cfa6fa9
--- /dev/null
+++ b/regression-test/suites/query_p0/sort_spill/sort_spill.groovy
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("sort_spill") {
+ sql """
+ drop table if exists d_table;
+ """
+ sql """
+ create table d_table(
+ k1 int,
+ k2 int,
+ ) distributed by random buckets 10
+ properties ("replication_num"="1");
+ """
+ sql """
+ insert into d_table select e1,e1 from (select 1 k1) as t lateral view
explode_numbers(10000) tmp1 as e1;
+ """
+ sql """ set parallel_pipeline_task_num = 2; """
+ sql """ set batch_size = 100; """
+ sql """ set enable_force_spill=true; """
+ sql """ set enable_topn_lazy_materialization=false;"""
+ sql """ set enable_reserve_memory=true; """
+ sql """ set force_sort_algorithm = "full"; """
+ sql """ set enable_parallel_result_sink=true; """
+ qt_select_1 "select k1,row_number () over (ORDER BY k2 DESC) from d_table
order by k1 limit 10 offset 9900;"
+ qt_select_2 "select k1,row_number () over (ORDER BY k2 DESC) from d_table
order by k1 limit 10;"
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]