This is an automated email from the ASF dual-hosted git repository.
zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 57672d180d2 [fix](be) Continue sorted merge when sender queue is ready
(#65004)
57672d180d2 is described below
commit 57672d180d229d80617be96284e12c6bc03619d2
Author: HappenLee <[email protected]>
AuthorDate: Wed Jul 1 17:18:58 2026 +0800
[fix](be) Continue sorted merge when sender queue is ready (#65004)
### What problem does this PR solve?
When a sorted-run merge cursor reaches the end of its current block and
the sender already has the next block ready, the merger may fetch the
next block before flushing rows that have already been selected into the
output block.
For variable-length columns, the pending row addresses still point to
the previous cursor block. Reusing the cursor block before `do_insert()`
can make the output read rows from the wrong block and corrupt string
offsets.
### What is changed?
Flush pending output rows before loading the next ready block from the
exhausted cursor.
This keeps the existing non-ready path behavior unchanged: when the next
block is not ready, the cursor is saved as pending and the current
output block is returned first.
The PR also adds regression coverage for both:
- continuing merge when the next sender block is already ready
- preserving string column values before the cursor block is reused
---
be/src/exec/exchange/vdata_stream_recvr.cpp | 12 +-
be/src/exec/exchange/vdata_stream_recvr.h | 2 +
be/src/exec/sort/sort_cursor.h | 22 ++-
be/src/exec/sort/vsorted_run_merger.cpp | 34 ++++-
be/src/exec/sort/vsorted_run_merger.h | 8 +-
be/test/core/value/sort_merger_test.cpp | 201 +++++++++++++++++++++++++++-
6 files changed, 264 insertions(+), 15 deletions(-)
diff --git a/be/src/exec/exchange/vdata_stream_recvr.cpp
b/be/src/exec/exchange/vdata_stream_recvr.cpp
index 7b77c24ecbd..f54baf2221d 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.cpp
+++ b/be/src/exec/exchange/vdata_stream_recvr.cpp
@@ -113,6 +113,11 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block*
block, bool* eos) {
return Status::OK();
}
+bool VDataStreamRecvr::SenderQueue::has_data_or_finished() {
+ std::lock_guard<std::mutex> l(_lock);
+ return _is_cancelled || !_block_queue.empty() || _num_remaining_senders ==
0;
+}
+
void
VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) {
// Here, it is necessary to check if _source_dependency is not nullptr.
// This is because the queue might be closed before setting the source
dependency.
@@ -431,16 +436,21 @@ Status VDataStreamRecvr::create_merger(const
VExprContextSPtrs& ordering_expr,
DCHECK(_is_merging);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
std::vector<BlockSupplier> child_block_suppliers;
+ std::vector<BlockSupplierReadyChecker> child_block_supplier_ready_checkers;
// Create the merger that will a single stream of sorted rows.
_merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order,
nulls_first, batch_size, limit,
offset, _profile));
+ child_block_suppliers.reserve(_sender_queues.size());
+ child_block_supplier_ready_checkers.reserve(_sender_queues.size());
for (int i = 0; i < _sender_queues.size(); ++i) {
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
_sender_queues[i],
std::placeholders::_1,
std::placeholders::_2));
+ child_block_supplier_ready_checkers.emplace_back(
+ std::bind(std::mem_fn(&SenderQueue::has_data_or_finished),
_sender_queues[i]));
}
- RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+ RETURN_IF_ERROR(_merger->prepare(child_block_suppliers,
child_block_supplier_ready_checkers));
return Status::OK();
}
diff --git a/be/src/exec/exchange/vdata_stream_recvr.h
b/be/src/exec/exchange/vdata_stream_recvr.h
index 12650e780a1..ccdb849c576 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.h
+++ b/be/src/exec/exchange/vdata_stream_recvr.h
@@ -181,6 +181,8 @@ public:
Status get_batch(Block* next_block, bool* eos);
+ bool has_data_or_finished();
+
Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t
packet_seq,
::google::protobuf::Closure** done, const int64_t
wait_for_worker,
const uint64_t time_to_find_recvr);
diff --git a/be/src/exec/sort/sort_cursor.h b/be/src/exec/sort/sort_cursor.h
index a13906c2be5..c0ae4a02edd 100644
--- a/be/src/exec/sort/sort_cursor.h
+++ b/be/src/exec/sort/sort_cursor.h
@@ -22,6 +22,7 @@
#include <glog/logging.h>
+#include <functional>
#include <utility>
#include "core/block/block.h"
@@ -107,6 +108,7 @@ struct MergeSortCursorImpl {
virtual void process_next() {}
virtual Block* block_ptr() { return nullptr; }
virtual bool eof() const { return false; }
+ virtual bool has_ready_block_or_eos() const { return false; }
Field get_top_value() const {
Field field {PrimitiveType::TYPE_NULL};
@@ -116,14 +118,18 @@ struct MergeSortCursorImpl {
};
using BlockSupplier = std::function<Status(Block*, bool* eos)>;
+using BlockSupplierReadyChecker = std::function<bool()>;
struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
BlockSupplierSortCursorImpl(BlockSupplier block_supplier,
const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
- const std::vector<bool>& nulls_first)
- : _ordering_expr(ordering_expr),
_block_supplier(std::move(block_supplier)) {
+ const std::vector<bool>& nulls_first,
+ BlockSupplierReadyChecker
block_supplier_ready_checker = {})
+ : _ordering_expr(ordering_expr),
+ _block_supplier(std::move(block_supplier)),
+
_block_supplier_ready_checker(std::move(block_supplier_ready_checker)) {
block = Block::create_shared();
sort_columns_size = ordering_expr.size();
@@ -135,11 +141,18 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
process_next();
}
- BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const
SortDescription& desc_)
- : MergeSortCursorImpl(desc_),
_block_supplier(std::move(block_supplier)) {
+ BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const
SortDescription& desc_,
+ BlockSupplierReadyChecker
block_supplier_ready_checker = {})
+ : MergeSortCursorImpl(desc_),
+ _block_supplier(std::move(block_supplier)),
+
_block_supplier_ready_checker(std::move(block_supplier_ready_checker)) {
process_next();
}
+ bool has_ready_block_or_eos() const override {
+ return _block_supplier_ready_checker &&
_block_supplier_ready_checker();
+ }
+
void process_next() override {
if (_is_eof) {
return;
@@ -166,6 +179,7 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
VExprContextSPtrs _ordering_expr;
BlockSupplier _block_supplier;
+ BlockSupplierReadyChecker _block_supplier_ready_checker;
bool _is_eof = false;
};
diff --git a/be/src/exec/sort/vsorted_run_merger.cpp
b/be/src/exec/sort/vsorted_run_merger.cpp
index 8323490031d..353fb949656 100644
--- a/be/src/exec/sort/vsorted_run_merger.cpp
+++ b/be/src/exec/sort/vsorted_run_merger.cpp
@@ -61,14 +61,22 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile)
{
_get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
}
-Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs)
{
+Status VSortedRunMerger::prepare(
+ const std::vector<BlockSupplier>& input_runs,
+ const std::vector<BlockSupplierReadyChecker>&
input_run_ready_checkers) {
try {
- for (const auto& supplier : input_runs) {
+ for (size_t i = 0; i < input_runs.size(); ++i) {
+ const auto& supplier = input_runs[i];
+ BlockSupplierReadyChecker ready_checker;
+ if (i < input_run_ready_checkers.size()) {
+ ready_checker = input_run_ready_checkers[i];
+ }
if (_use_sort_desc) {
-
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier,
_desc));
+ _cursors.emplace_back(
+ BlockSupplierSortCursorImpl::create_shared(supplier,
_desc, ready_checker));
} else {
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(
- supplier, _ordering_expr, _is_asc_order,
_nulls_first));
+ supplier, _ordering_expr, _is_asc_order, _nulls_first,
ready_checker));
}
}
} catch (const std::exception& e) {
@@ -193,7 +201,12 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
}
current->next();
- if (_need_more_data(current)) {
+ const bool has_ready_block_or_eos =
+ current->is_last(0) && !current->eof() &&
current->has_ready_block_or_eos();
+ if (has_ready_block_or_eos) {
+ do_insert();
+ }
+ if (_need_more_data(current, has_ready_block_or_eos)) {
do_insert();
scoped_mutable_block.restore();
return Status::OK();
@@ -211,12 +224,21 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
return Status::OK();
}
-bool VSortedRunMerger::_need_more_data(MergeSortCursor& current) {
+bool VSortedRunMerger::_need_more_data(MergeSortCursor& current, bool
has_ready_block_or_eos) {
if (!current->is_last(0)) {
_priority_queue.push(current);
return false;
} else if (current->eof()) {
return false;
+ } else if (has_ready_block_or_eos) {
+ {
+ ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
+ current->process_next();
+ }
+ if (!current->eof()) {
+ _priority_queue.push(current);
+ }
+ return false;
} else {
_pending_cursor = current.impl;
return true;
diff --git a/be/src/exec/sort/vsorted_run_merger.h
b/be/src/exec/sort/vsorted_run_merger.h
index 191db39f295..11a20a490c4 100644
--- a/be/src/exec/sort/vsorted_run_merger.h
+++ b/be/src/exec/sort/vsorted_run_merger.h
@@ -55,7 +55,8 @@ public:
// Prepare this merger to merge and return rows from the sorted runs in
'input_runs'.
// Retrieves the first batch from each run and sets up the binary heap
implementing
// the priority queue.
- Status prepare(const std::vector<BlockSupplier>& input_runs);
+ Status prepare(const std::vector<BlockSupplier>& input_runs,
+ const std::vector<BlockSupplierReadyChecker>&
input_run_ready_checkers = {});
// Return the next block of sorted rows from this merger.
Status get_next(Block* output_block, bool* eos);
@@ -91,8 +92,9 @@ protected:
private:
void init_timers(RuntimeProfile* profile);
- // If current stream is exhausted and not eof, we should break this loop
and read more blocks.
- bool _need_more_data(MergeSortCursor& current);
+ // If current stream is exhausted and not eof, keep merging only when the
next block or eos is
+ // already available. Otherwise break this loop and wait for more data.
+ bool _need_more_data(MergeSortCursor& current, bool
has_ready_block_or_eos);
};
} // namespace doris
diff --git a/be/test/core/value/sort_merger_test.cpp
b/be/test/core/value/sort_merger_test.cpp
index b5c0b8ced95..75a95db7c9b 100644
--- a/be/test/core/value/sort_merger_test.cpp
+++ b/be/test/core/value/sort_merger_test.cpp
@@ -17,8 +17,13 @@
#include <gtest/gtest.h>
+#include <cstdint>
+#include <string>
+#include <vector>
+
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
#include "exec/sort/vsorted_run_merger.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_slot_ref.h"
@@ -33,6 +38,16 @@ public:
void TearDown() override {}
};
+static Block create_int_string_block(const std::vector<int64_t>& keys,
+ const std::vector<std::string>& values) {
+ DCHECK_EQ(keys.size(), values.size());
+ auto key_column = ColumnHelper::create_column<DataTypeInt64>(keys);
+ auto value_column = ColumnHelper::create_column<DataTypeString>(values);
+ Block block({ColumnWithTypeAndName(key_column,
std::make_shared<DataTypeInt64>(), "key"),
+ ColumnWithTypeAndName(value_column,
std::make_shared<DataTypeString>(), "value")});
+ return block;
+}
+
TEST(SortMergerTest, NULL_FIRST_ASC) {
/**
* in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
@@ -478,4 +493,188 @@ TEST(SortMergerTest, TEST_SINGLE_STREAM) {
}
}
-} // namespace doris
\ No newline at end of file
+TEST(SortMergerTest, CONTINUE_MERGE_WHEN_NEXT_BLOCK_READY) {
+ const int batch_size = 4;
+ std::vector<std::vector<std::vector<int64_t>>> input_blocks = {{{1}, {3}},
{{2}, {4}}};
+ std::vector<size_t> next_block_index(input_blocks.size(), 0);
+
+ std::unique_ptr<VSortedRunMerger> merger;
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto ordering_expr =
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
+ {
+ std::vector<bool> is_asc_order = {true};
+ std::vector<bool> nulls_first = {true};
+ const int limit = -1;
+ const int offset = 0;
+ merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order,
nulls_first, batch_size,
+ limit, offset, profile.get()));
+ }
+ {
+ std::vector<BlockSupplier> child_block_suppliers;
+ std::vector<BlockSupplierReadyChecker> ready_checkers;
+ for (size_t child_idx = 0; child_idx < input_blocks.size();
child_idx++) {
+ BlockSupplier block_supplier = [&, id = child_idx](Block* block,
bool* eos) {
+ if (next_block_index[id] >= input_blocks[id].size()) {
+ *eos = true;
+ return Status::OK();
+ }
+ *block = ColumnHelper::create_block<DataTypeInt64>(
+ input_blocks[id][next_block_index[id]++]);
+ *eos = false;
+ return Status::OK();
+ };
+ child_block_suppliers.push_back(block_supplier);
+ ready_checkers.emplace_back([] { return true; });
+ }
+ EXPECT_TRUE(merger->prepare(child_block_suppliers,
ready_checkers).ok());
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ auto expect_block = ColumnHelper::create_column<DataTypeInt64>({1, 2,
3, 4});
+
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column,
expect_block));
+ EXPECT_EQ(block.rows(), batch_size);
+ EXPECT_FALSE(eos);
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ EXPECT_EQ(block.rows(), 0);
+ EXPECT_TRUE(eos);
+ }
+}
+
+TEST(SortMergerTest, CONTINUE_MERGE_WITH_STRING_COLUMN_WHEN_NEXT_BLOCK_READY) {
+ const int batch_size = 4;
+ std::vector<std::vector<Block>> input_blocks;
+ input_blocks.emplace_back(
+ std::vector<Block> {create_int_string_block({1, 3}, {"old-1",
"old-3"}),
+ create_int_string_block({5, 7}, {"new-5",
"new-7"})});
+ input_blocks.emplace_back(
+ std::vector<Block> {create_int_string_block({2, 4}, {"other-2",
"other-4"})});
+ std::vector<size_t> next_block_index(input_blocks.size(), 0);
+
+ std::unique_ptr<VSortedRunMerger> merger;
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto ordering_expr =
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
+ {
+ std::vector<bool> is_asc_order = {true};
+ std::vector<bool> nulls_first = {true};
+ const int limit = -1;
+ const int offset = 0;
+ merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order,
nulls_first, batch_size,
+ limit, offset, profile.get()));
+ }
+ {
+ std::vector<BlockSupplier> child_block_suppliers;
+ std::vector<BlockSupplierReadyChecker> ready_checkers;
+ for (size_t child_idx = 0; child_idx < input_blocks.size();
child_idx++) {
+ BlockSupplier block_supplier = [&, id = child_idx](Block* block,
bool* eos) {
+ if (next_block_index[id] >= input_blocks[id].size()) {
+ *eos = true;
+ return Status::OK();
+ }
+ block->swap(input_blocks[id][next_block_index[id]++]);
+ *eos = false;
+ return Status::OK();
+ };
+ child_block_suppliers.push_back(block_supplier);
+ ready_checkers.emplace_back([] { return true; });
+ }
+ EXPECT_TRUE(merger->prepare(child_block_suppliers,
ready_checkers).ok());
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ auto expect_key_column =
ColumnHelper::create_column<DataTypeInt64>({1, 2, 3, 4});
+ auto expect_value_column = ColumnHelper::create_column<DataTypeString>(
+ {"old-1", "other-2", "old-3", "other-4"});
+
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column,
expect_key_column));
+ EXPECT_TRUE(
+ ColumnHelper::column_equal(block.get_by_position(1).column,
expect_value_column));
+ EXPECT_EQ(block.rows(), batch_size);
+ EXPECT_FALSE(eos);
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ auto expect_key_column =
ColumnHelper::create_column<DataTypeInt64>({5, 7});
+ auto expect_value_column =
ColumnHelper::create_column<DataTypeString>({"new-5", "new-7"});
+
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column,
expect_key_column));
+ EXPECT_TRUE(
+ ColumnHelper::column_equal(block.get_by_position(1).column,
expect_value_column));
+ EXPECT_EQ(block.rows(), 2);
+ EXPECT_FALSE(eos);
+ }
+}
+
+TEST(SortMergerTest, KEEP_PENDING_CURSOR_WHEN_NEXT_BLOCK_NOT_READY) {
+ const int batch_size = 4;
+ std::vector<std::vector<std::vector<int64_t>>> input_blocks = {{{1}, {3}},
{{2}, {4}}};
+ std::vector<size_t> next_block_index(input_blocks.size(), 0);
+ std::vector<bool> ready(input_blocks.size(), false);
+
+ std::unique_ptr<VSortedRunMerger> merger;
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto ordering_expr =
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
+ {
+ std::vector<bool> is_asc_order = {true};
+ std::vector<bool> nulls_first = {true};
+ const int limit = -1;
+ const int offset = 0;
+ merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order,
nulls_first, batch_size,
+ limit, offset, profile.get()));
+ }
+ {
+ std::vector<BlockSupplier> child_block_suppliers;
+ std::vector<BlockSupplierReadyChecker> ready_checkers;
+ for (size_t child_idx = 0; child_idx < input_blocks.size();
child_idx++) {
+ BlockSupplier block_supplier = [&, id = child_idx](Block* block,
bool* eos) {
+ if (next_block_index[id] >= input_blocks[id].size()) {
+ *eos = true;
+ return Status::OK();
+ }
+ *block = ColumnHelper::create_block<DataTypeInt64>(
+ input_blocks[id][next_block_index[id]++]);
+ *eos = false;
+ return Status::OK();
+ };
+ child_block_suppliers.push_back(block_supplier);
+ ready_checkers.emplace_back([&, id = child_idx] { return
ready[id]; });
+ }
+ EXPECT_TRUE(merger->prepare(child_block_suppliers,
ready_checkers).ok());
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ auto expect_block = ColumnHelper::create_column<DataTypeInt64>({1});
+
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column,
expect_block));
+ EXPECT_EQ(block.rows(), 1);
+ EXPECT_FALSE(eos);
+ }
+ {
+ ready[0] = true;
+ ready[1] = true;
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ auto expect_block = ColumnHelper::create_column<DataTypeInt64>({2, 3,
4});
+
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column,
expect_block));
+ EXPECT_EQ(block.rows(), 3);
+ EXPECT_FALSE(eos);
+ }
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+ EXPECT_EQ(block.rows(), 0);
+ EXPECT_TRUE(eos);
+ }
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]