This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 51a6b14eb6c [refactor](merger) Simplify sort merger (#47689)
51a6b14eb6c is described below
commit 51a6b14eb6cf0e8edebc2d298b9549189ab891e9
Author: Gabriel <[email protected]>
AuthorDate: Tue Feb 11 14:13:20 2025 +0800
[refactor](merger) Simplify sort merger (#47689)
1. Delete unused variable in `VSortExecExprs`.
2. De-couple sort operator and local exchange operator.
3. Use local exchange 's profile to collect sort merger's metrics
instead of sort operator's.
---
be/src/pipeline/exec/exchange_source_operator.cpp | 2 +-
be/src/pipeline/exec/sort_source_operator.cpp | 13 -----------
be/src/pipeline/exec/sort_source_operator.h | 5 ++--
be/src/pipeline/local_exchange/local_exchanger.cpp | 10 ++++++--
be/src/pipeline/local_exchange/local_exchanger.h | 16 +++++++++----
be/src/pipeline/pipeline_fragment_context.cpp | 5 +++-
be/src/vec/common/sort/heap_sorter.cpp | 4 ++--
be/src/vec/common/sort/sorter.cpp | 4 ++--
be/src/vec/common/sort/vsort_exec_exprs.cpp | 27 +++++-----------------
be/src/vec/common/sort/vsort_exec_exprs.h | 16 +++----------
be/test/vec/exec/sort/sort_test.cpp | 2 +-
11 files changed, 40 insertions(+), 64 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 4f12a8ef38e..762e108fd48 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -151,7 +151,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState*
state, vectorized::Block
if (_is_merging && !local_state.is_ready) {
SCOPED_TIMER(local_state.create_merger_timer);
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
- local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(),
_is_asc_order, _nulls_first,
+ local_state.vsort_exec_exprs.ordering_expr_ctxs(),
_is_asc_order, _nulls_first,
state->batch_size(), _limit, _offset));
local_state.is_ready = true;
return Status::OK();
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index 2fb09d7278f..808f6533d6e 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -67,18 +67,5 @@ const vectorized::SortDescription&
SortSourceOperatorX::get_sort_description(
return local_state._shared_state->sorter->get_sort_description();
}
-Status SortSourceOperatorX::build_merger(RuntimeState* state,
-
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
- RuntimeProfile* profile) {
- // now only use in LocalMergeSortExchanger::get_block
- vectorized::VSortExecExprs vsort_exec_exprs;
- // clone vsort_exec_exprs in LocalMergeSortExchanger
- RETURN_IF_ERROR(_vsort_exec_exprs.clone(state, vsort_exec_exprs));
- merger = std::make_unique<vectorized::VSortedRunMerger>(
- vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order,
_nulls_first,
- state->batch_size(), _limit, _offset, profile);
- return Status::OK();
-}
-
#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_source_operator.h
b/be/src/pipeline/exec/sort_source_operator.h
index a638b04b368..7902e4815bf 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -56,11 +56,10 @@ public:
bool use_local_merge() const { return _merge_by_exchange; }
const vectorized::SortDescription& get_sort_description(RuntimeState*
state) const;
- Status build_merger(RuntimeState* state,
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
- RuntimeProfile* profile);
-
private:
+ friend class PipelineFragmentContext;
friend class SortLocalState;
+
const bool _merge_by_exchange;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index c768668acc4..0fb4db625a5 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -20,7 +20,6 @@
#include "common/cast_set.h"
#include "common/status.h"
#include "pipeline/exec/sort_sink_operator.h"
-#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "vec/runtime/partitioner.h"
@@ -410,7 +409,14 @@ void LocalMergeSortExchanger::finalize() {
Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
LocalExchangeSourceLocalState*
local_state) {
- RETURN_IF_ERROR(_sort_source->build_merger(state, _merger,
local_state->profile()));
+ vectorized::VExprContextSPtrs ordering_expr_ctxs;
+ ordering_expr_ctxs.resize(_merge_info.ordering_expr_ctxs.size());
+ for (size_t i = 0; i < ordering_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(_merge_info.ordering_expr_ctxs[i]->clone(state,
ordering_expr_ctxs[i]));
+ }
+ _merger = std::make_unique<vectorized::VSortedRunMerger>(
+ ordering_expr_ctxs, _merge_info.is_asc_order,
_merge_info.nulls_first,
+ state->batch_size(), _merge_info.limit, _merge_info.offset,
local_state->profile());
std::vector<vectorized::BlockSupplier> child_block_suppliers;
for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
vectorized::BlockSupplier block_supplier = [&, local_state, id =
channel_id](
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 90edeca07e8..7f87289e413 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -29,7 +29,6 @@ namespace pipeline {
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
class BlockWrapper;
-class SortSourceOperatorX;
struct Profile {
RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
@@ -335,11 +334,18 @@ public:
class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> {
public:
+ struct MergeInfo {
+ const std::vector<bool>& is_asc_order;
+ const std::vector<bool>& nulls_first;
+ const int64_t limit;
+ const int64_t offset;
+ const vectorized::VExprContextSPtrs& ordering_expr_ctxs;
+ };
ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
- LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
- int running_sink_operators, int num_partitions,
int free_block_limit)
+ LocalMergeSortExchanger(MergeInfo&& merge_info, int
running_sink_operators, int num_partitions,
+ int free_block_limit)
: Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions, free_block_limit),
- _sort_source(std::move(sort_source)) {}
+ _merge_info(std::move(merge_info)) {}
~LocalMergeSortExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
Profile&& profile,
SinkInfo&& sink_info) override;
@@ -355,7 +361,7 @@ public:
private:
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
- std::shared_ptr<SortSourceOperatorX> _sort_source;
+ MergeInfo _merge_info;
std::vector<std::atomic_int64_t> _queues_mem_usege;
};
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 30f1a437ff0..ac45128ad5f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -818,7 +818,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
child_op->get_name());
}
shared_state->exchanger = LocalMergeSortExchanger::create_unique(
- sort_source, cur_pipe->num_tasks(), _num_instances,
+ LocalMergeSortExchanger::MergeInfo {
+ sort_source->_is_asc_order, sort_source->_nulls_first,
sort_source->_limit,
+ sort_source->_offset,
sort_source->_vsort_exec_exprs.ordering_expr_ctxs()},
+ cur_pipe->num_tasks(), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
? cast_set<int>(
_runtime_state->query_options().local_exchange_free_blocks_limit)
diff --git a/be/src/vec/common/sort/heap_sorter.cpp
b/be/src/vec/common/sort/heap_sorter.cpp
index 01db368e980..f9e3f28cd93 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -195,9 +195,9 @@ void HeapSorter::_do_filter(HeapSortCursorBlockView&
block_view, size_t num_rows
}
Status HeapSorter::_prepare_sort_descs(Block* block) {
-
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
+ _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
for (int i = 0; i < _sort_description.size(); i++) {
- const auto& ordering_expr =
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
+ const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
RETURN_IF_ERROR(ordering_expr->execute(block,
&_sort_description[i].column_number));
_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index f491311a8f7..82b59cd6717 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -180,10 +180,10 @@ Status Sorter::partial_sort(Block& src_block, Block&
dest_block) {
dest_block.swap(new_block);
}
-
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
+ _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block;
for (int i = 0; i < _sort_description.size(); i++) {
- const auto& ordering_expr =
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
+ const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
RETURN_IF_ERROR(ordering_expr->execute(result_block,
&_sort_description[i].column_number));
_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
diff --git a/be/src/vec/common/sort/vsort_exec_exprs.cpp
b/be/src/vec/common/sort/vsort_exec_exprs.cpp
index cb3aaa6d654..4f5e44a12bd 100644
--- a/be/src/vec/common/sort/vsort_exec_exprs.cpp
+++ b/be/src/vec/common/sort/vsort_exec_exprs.cpp
@@ -48,7 +48,7 @@ Status VSortExecExprs::init(const TSortInfo& sort_info,
ObjectPool* pool) {
Status VSortExecExprs::init(const std::vector<TExpr>& ordering_exprs,
const std::vector<TExpr>* sort_tuple_slot_exprs,
ObjectPool* pool) {
- RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs,
_lhs_ordering_expr_ctxs));
+ RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs,
_ordering_expr_ctxs));
if (sort_tuple_slot_exprs != NULL) {
_materialize_tuple = true;
RETURN_IF_ERROR(
@@ -59,19 +59,12 @@ Status VSortExecExprs::init(const std::vector<TExpr>&
ordering_exprs,
return Status::OK();
}
-Status VSortExecExprs::init(const VExprContextSPtrs& lhs_ordering_expr_ctxs,
- const VExprContextSPtrs& rhs_ordering_expr_ctxs) {
- _lhs_ordering_expr_ctxs = lhs_ordering_expr_ctxs;
- _rhs_ordering_expr_ctxs = rhs_ordering_expr_ctxs;
- return Status::OK();
-}
-
Status VSortExecExprs::prepare(RuntimeState* state, const RowDescriptor&
child_row_desc,
const RowDescriptor& output_row_desc) {
if (_materialize_tuple) {
RETURN_IF_ERROR(VExpr::prepare(_sort_tuple_slot_expr_ctxs, state,
child_row_desc));
}
- RETURN_IF_ERROR(VExpr::prepare(_lhs_ordering_expr_ctxs, state,
output_row_desc));
+ RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state,
output_row_desc));
return Status::OK();
}
@@ -79,24 +72,16 @@ Status VSortExecExprs::open(RuntimeState* state) {
if (_materialize_tuple) {
RETURN_IF_ERROR(VExpr::open(_sort_tuple_slot_expr_ctxs, state));
}
- RETURN_IF_ERROR(VExpr::open(_lhs_ordering_expr_ctxs, state));
- RETURN_IF_ERROR(
- VExpr::clone_if_not_exists(_lhs_ordering_expr_ctxs, state,
_rhs_ordering_expr_ctxs));
+ RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state));
return Status::OK();
}
void VSortExecExprs::close(RuntimeState* state) {}
Status VSortExecExprs::clone(RuntimeState* state, VSortExecExprs& new_exprs) {
- new_exprs._lhs_ordering_expr_ctxs.resize(_lhs_ordering_expr_ctxs.size());
- new_exprs._rhs_ordering_expr_ctxs.resize(_rhs_ordering_expr_ctxs.size());
- for (size_t i = 0; i < _lhs_ordering_expr_ctxs.size(); i++) {
- RETURN_IF_ERROR(
- _lhs_ordering_expr_ctxs[i]->clone(state,
new_exprs._lhs_ordering_expr_ctxs[i]));
- }
- for (size_t i = 0; i < _rhs_ordering_expr_ctxs.size(); i++) {
- RETURN_IF_ERROR(
- _rhs_ordering_expr_ctxs[i]->clone(state,
new_exprs._rhs_ordering_expr_ctxs[i]));
+ new_exprs._ordering_expr_ctxs.resize(_ordering_expr_ctxs.size());
+ for (size_t i = 0; i < _ordering_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(_ordering_expr_ctxs[i]->clone(state,
new_exprs._ordering_expr_ctxs[i]));
}
new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size());
for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) {
diff --git a/be/src/vec/common/sort/vsort_exec_exprs.h
b/be/src/vec/common/sort/vsort_exec_exprs.h
index 8b6a11f9a33..c92c20f9e7d 100644
--- a/be/src/vec/common/sort/vsort_exec_exprs.h
+++ b/be/src/vec/common/sort/vsort_exec_exprs.h
@@ -58,10 +58,7 @@ public:
}
// Can only be used after calling prepare()
- const VExprContextSPtrs& lhs_ordering_expr_ctxs() const { return
_lhs_ordering_expr_ctxs; }
-
- // Can only be used after calling open()
- const VExprContextSPtrs& rhs_ordering_expr_ctxs() const { return
_rhs_ordering_expr_ctxs; }
+ const VExprContextSPtrs& ordering_expr_ctxs() const { return
_ordering_expr_ctxs; }
bool need_materialize_tuple() const { return _materialize_tuple; }
@@ -73,8 +70,7 @@ public:
private:
// Create two VExprContexts for evaluating over the TupleRows.
- VExprContextSPtrs _lhs_ordering_expr_ctxs;
- VExprContextSPtrs _rhs_ordering_expr_ctxs;
+ VExprContextSPtrs _ordering_expr_ctxs;
// If true, the tuples to be sorted are materialized by
// _sort_tuple_slot_exprs before the actual sort is performed.
@@ -85,16 +81,10 @@ private:
// _materialize_tuple is true.
VExprContextSPtrs _sort_tuple_slot_expr_ctxs;
- // for some reason, _sort_tuple_slot_expr_ctxs is not-null but
_lhs_ordering_expr_ctxs is nullable
+ // for some reason, _sort_tuple_slot_expr_ctxs is not-null but
_ordering_expr_ctxs is nullable
// this flag list would be used to convert column to nullable.
std::vector<bool> _need_convert_to_nullable_flags;
- // Initialize directly from already-created VExprContexts. Callers should
manually call
- // Prepare(), Open(), and Close() on input VExprContexts (instead of
calling the
- // analogous functions in this class). Used for testing.
- Status init(const VExprContextSPtrs& lhs_ordering_expr_ctxs,
- const VExprContextSPtrs& rhs_ordering_expr_ctxs);
-
// Initialize the ordering and (optionally) materialization expressions
from the thrift
// TExprs into the specified pool. sort_tuple_slot_exprs is NULL if the
tuple is not
// materialized.
diff --git a/be/test/vec/exec/sort/sort_test.cpp
b/be/test/vec/exec/sort/sort_test.cpp
index a9e06507f40..e774bcdb037 100644
--- a/be/test/vec/exec/sort/sort_test.cpp
+++ b/be/test/vec/exec/sort/sort_test.cpp
@@ -59,7 +59,7 @@ public:
sort_exec_exprs._materialize_tuple = false;
- sort_exec_exprs._lhs_ordering_expr_ctxs.push_back(
+ sort_exec_exprs._ordering_expr_ctxs.push_back(
VExprContext::create_shared(std::make_shared<MockSlotRef>(0)));
switch (sort_type) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]