This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 173d43cff76 [feature](pipeline)add local merge sort exchanger (#35682)
173d43cff76 is described below
commit 173d43cff76321fd057eba64d1a8122034348480
Author: Mryange <[email protected]>
AuthorDate: Wed Jun 5 20:06:54 2024 +0800
[feature](pipeline)add local merge sort exchanger (#35682)
```
DATA_STREAM_SINK_OPERATOR (id=2,dst_id=2):
- BlocksProduced: sum 2, avg 1, max 2,
min 0
- CloseTime: avg 50.352us, max 59.404us,
min 41.300us
- ExecTime: avg 357.197us, max 428.119us,
min 286.275us
- InitTime: avg 103.853us, max 104.291us,
min 103.416us
- InputRows: sum 60, avg 30, max 60,
min 0
- MemoryUsage: sum , avg , max , min
- PeakMemoryUsage: sum 0.00 , avg
0.00 , max 0.00 , min 0.00
- OpenTime: avg 128.654us, max 163.939us,
min 93.370us
- RowsProduced: sum 60, avg 30, max 60,
min 0
- WaitForDependencyTime: avg 0ns, max 0ns,
min 0ns
- WaitForRpcBufferQueue: avg 0ns, max
0ns, min 0ns
LOCAL_EXCHANGE_OPERATOR (LOCAL_MERGE_SORT)
(id=-3):
- BlocksProduced: sum 2, avg 1, max
2, min 0
- CloseTime: avg 0ns, max 0ns, min
0ns
- ExecTime: avg 873.747us, max
1.737ms, min 9.663us
- GetBlockFailedTime: sum 0, avg 0,
max 0, min 0
- InitTime: avg 466ns, max 495ns, min
437ns
- MemoryUsage: sum , avg , max , min
- PeakMemoryUsage: sum 0.00 , avg
0.00 , max 0.00 , min 0.00
- OpenTime: avg 19.531us, max
31.460us, min 7.602us
- ProjectionTime: avg 0ns, max 0ns,
min 0ns
- RowsProduced: sum 60, avg 30, max
60, min 0
-
WaitForDependency[LOCAL_EXCHANGE_OPERATOR_DEPENDENCY]Time: avg 14.130ms, max
14.701ms, min 13.558ms
Pipeline : 1(instance_num=2):
LOCAL_EXCHANGE_SINK_OPERATOR (LOCAL_MERGE_SORT)
(id=-3):
- CloseTime: avg 0ns, max 0ns, min 0ns
- ExecTime: avg 59.515us, max 68.50us,
min 50.981us
- InitTime: avg 8.415us, max 8.695us, min
8.135us
- InputRows: sum 60, avg 30, max 30,
min 30
- MemoryUsage: sum , avg , max , min
- PeakMemoryUsage: sum 0.00 , avg
0.00 , max 0.00 , min 0.00
- OpenTime: avg 1.498us, max 1.630us, min
1.366us
-
WaitForDependency[LOCAL_EXCHANGE_SINK_DEPENDENCY]Time: avg 0ns, max 0ns,
min 0ns
SORT_OPERATOR (id=1 , nereids_id=108):
- PlanInfo
- order by: s_suppkey ASC
- TOPN OPT
- offset: 0
- limit: 30
- BlocksProduced: sum 2, avg 1, max
1, min 1
- CloseTime: avg 0ns, max 0ns, min
0ns
- ExecTime: avg 10.593us, max 18.22us,
min 3.165us
- InitTime: avg 0ns, max 0ns, min 0ns
- MemoryUsage: sum , avg , max , min
- PeakMemoryUsage: sum 0.00 , avg
0.00 , max 0.00 , min 0.00
- OpenTime: avg 0ns, max 0ns, min 0ns
- ProjectionTime: avg 0ns, max 0ns,
min 0ns
```
---
be/src/pipeline/dependency.h | 4 +
be/src/pipeline/exec/exchange_sink_operator.cpp | 18 +++-
be/src/pipeline/exec/exchange_sink_operator.h | 2 +
be/src/pipeline/exec/operator.h | 1 +
be/src/pipeline/exec/sort_source_operator.cpp | 43 +++++++++-
be/src/pipeline/exec/sort_source_operator.h | 17 ++++
.../local_exchange_sink_operator.cpp | 1 +
.../local_exchange/local_exchange_sink_operator.h | 9 ++
.../local_exchange_source_operator.h | 2 +
be/src/pipeline/local_exchange/local_exchanger.cpp | 95 ++++++++++++++++++++++
be/src/pipeline/local_exchange/local_exchanger.h | 47 +++++++++++
be/src/pipeline/pipeline_fragment_context.cpp | 17 ++++
be/src/runtime/runtime_state.h | 5 ++
be/src/vec/core/sort_cursor.h | 3 +
.../java/org/apache/doris/qe/SessionVariable.java | 6 ++
gensrc/thrift/PaloInternalService.thrift | 2 +
16 files changed, 270 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 46335caade5..e5a019b4fa0 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -792,6 +792,8 @@ enum class ExchangeType : uint8_t {
ADAPTIVE_PASSTHROUGH = 5,
// Send all data to the first channel.
PASS_TO_ONE = 6,
+ // merge all data to one channel.
+ LOCAL_MERGE_SORT = 7,
};
inline std::string get_exchange_type_name(ExchangeType idx) {
@@ -810,6 +812,8 @@ inline std::string get_exchange_type_name(ExchangeType idx)
{
return "ADAPTIVE_PASSTHROUGH";
case ExchangeType::PASS_TO_ONE:
return "PASS_TO_ONE";
+ case ExchangeType::LOCAL_MERGE_SORT:
+ return "LOCAL_MERGE_SORT";
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index d00d9521196..11633f4fcf2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -26,7 +26,9 @@
#include "common/status.h"
#include "exchange_sink_buffer.h"
+#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
+#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "vec/columns/column_const.h"
#include "vec/exprs/vexpr.h"
@@ -294,7 +296,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_partition(sink.tablet_sink_partition),
_tablet_sink_location(sink.tablet_sink_location),
_tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
- _tablet_sink_txn_id(sink.tablet_sink_txn_id) {
+ _tablet_sink_txn_id(sink.tablet_sink_txn_id),
+ _enable_local_merge_sort(state->enable_local_merge_sort()) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -648,4 +651,17 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
return Base::close(state, exec_status);
}
+DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
+ if (_child_x && _enable_local_merge_sort) {
+ // SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR
+ // SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR
+ if (auto sort_source =
std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x);
+ sort_source && sort_source->use_local_merge()) {
+ // Sort the data local
+ return ExchangeType::LOCAL_MERGE_SORT;
+ }
+ }
+ return
DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 0445dafacc0..1c14f04679e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -218,6 +218,7 @@ public:
Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block*
src, PBlock* dest,
int num_receivers = 1);
+ DataDistribution required_data_distribution() const override;
private:
friend class ExchangeSinkLocalState;
@@ -270,6 +271,7 @@ private:
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
size_t _data_processed = 0;
int _writer_count = 1;
+ const bool _enable_local_merge_sort;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index f78d08eeb9e..38b0b892f2f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -113,6 +113,7 @@ public:
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return
false; }
+ OperatorXPtr child_x() { return _child_x; }
protected:
OperatorXPtr _child_x = nullptr;
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index 34bfffb8d9f..89262828708 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -28,7 +28,35 @@ SortLocalState::SortLocalState(RuntimeState* state,
OperatorXBase* parent)
SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode&
tnode, int operator_id,
const DescriptorTbl& descs)
- : OperatorX<SortLocalState>(pool, tnode, operator_id, descs) {}
+ : OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
+ _merge_by_exchange(tnode.sort_node.merge_by_exchange),
+ _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0)
{}
+
+Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(Base::init(tnode, state));
+ RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
+ _is_asc_order = tnode.sort_node.sort_info.is_asc_order;
+ _nulls_first = tnode.sort_node.sort_info.nulls_first;
+ return Status::OK();
+}
+
+Status SortSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
+ // spill sort _child_x may be nullptr.
+ if (_child_x) {
+ RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(),
_row_descriptor));
+ }
+ return Status::OK();
+}
+
+Status SortSourceOperatorX::open(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::open(state));
+ // spill sort _child_x may be nullptr.
+ if (_child_x) {
+ RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+ }
+ return Status::OK();
+}
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
@@ -45,4 +73,17 @@ const vectorized::SortDescription&
SortSourceOperatorX::get_sort_description(
return local_state._shared_state->sorter->get_sort_description();
}
+Status SortSourceOperatorX::build_merger(RuntimeState* state,
+
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
+ RuntimeProfile* profile) {
+ // now only use in LocalMergeSortExchanger::get_block
+ vectorized::VSortExecExprs vsort_exec_exprs;
+ // clone vsort_exec_exprs in LocalMergeSortExchanger
+ RETURN_IF_ERROR(_vsort_exec_exprs.clone(state, vsort_exec_exprs));
+ merger = std::make_unique<vectorized::VSortedRunMerger>(
+ vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order,
_nulls_first,
+ state->batch_size(), _limit, _offset, profile);
+ return Status::OK();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_source_operator.h
b/be/src/pipeline/exec/sort_source_operator.h
index f20e8b9314b..86832e04ae0 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -28,6 +28,8 @@ class RuntimeState;
namespace pipeline {
class SortSourceOperatorX;
+class SortSinkOperatorX;
+
class SortLocalState final : public PipelineXLocalState<SortSharedState> {
public:
ENABLE_FACTORY_CREATOR(SortLocalState);
@@ -40,16 +42,31 @@ private:
class SortSourceOperatorX final : public OperatorX<SortLocalState> {
public:
+ using Base = OperatorX<SortLocalState>;
SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs);
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
+
bool is_source() const override { return true; }
+ bool use_local_merge() const { return _merge_by_exchange; }
const vectorized::SortDescription& get_sort_description(RuntimeState*
state) const;
+ Status build_merger(RuntimeState* state,
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
+ RuntimeProfile* profile);
+
private:
friend class SortLocalState;
+ const bool _merge_by_exchange;
+ std::vector<bool> _is_asc_order;
+ std::vector<bool> _nulls_first;
+ // Expressions and parameters used for build _sort_description
+ vectorized::VSortExecExprs _vsort_exec_exprs;
+ const int64_t _offset;
};
} // namespace pipeline
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index c8c165e4c90..a310b921b18 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -80,6 +80,7 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+ _channel_id = info.task_idx;
return Status::OK();
}
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 10234fe3043..36530bc8ef1 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -30,6 +30,7 @@ class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
class PassToOneExchanger;
+class LocalMergeSortExchanger;
class LocalExchangeSinkOperatorX;
class LocalExchangeSinkLocalState final : public
PipelineXSinkLocalState<LocalExchangeSharedState> {
public:
@@ -44,6 +45,13 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
std::string debug_string(int indentation_level) const override;
+ std::vector<Dependency*> dependencies() const override {
+ auto deps = Base::dependencies();
+ if (auto local_state_sink_dep =
_exchanger->get_local_state_dependency(_channel_id)) {
+ deps.push_back(local_state_sink_dep.get());
+ }
+ return deps;
+ }
private:
friend class LocalExchangeSinkOperatorX;
@@ -52,6 +60,7 @@ private:
friend class PassthroughExchanger;
friend class BroadcastExchanger;
friend class PassToOneExchanger;
+ friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
Exchanger* _exchanger = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index ec662178dea..f32261cd574 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -26,6 +26,7 @@ class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
class PassToOneExchanger;
+class LocalMergeSortExchanger;
class LocalExchangeSourceOperatorX;
class LocalExchangeSourceLocalState final : public
PipelineXLocalState<LocalExchangeSharedState> {
public:
@@ -46,6 +47,7 @@ private:
friend class PassthroughExchanger;
friend class BroadcastExchanger;
friend class PassToOneExchanger;
+ friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
Exchanger* _exchanger = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index a7a21edbb58..980078b8fe8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -17,6 +17,9 @@
#include "pipeline/local_exchange/local_exchanger.h"
+#include "common/status.h"
+#include "pipeline/exec/sort_sink_operator.h"
+#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "vec/runtime/partitioner.h"
@@ -262,6 +265,98 @@ Status PassToOneExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
return Status::OK();
}
+Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos,
+ LocalExchangeSinkLocalState& local_state)
{
+ vectorized::Block new_block;
+ if (!_free_blocks.try_dequeue(new_block)) {
+ new_block = {in_block->clone_empty()};
+ }
+ new_block.swap(*in_block);
+ DCHECK_LE(local_state._channel_id, _data_queue.size());
+ _data_queue[local_state._channel_id].enqueue(std::move(new_block));
+ add_mem_usage(local_state, new_block.allocated_bytes());
+ local_state._shared_state->set_ready_to_read(0);
+ return Status::OK();
+}
+
+Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
+ LocalExchangeSourceLocalState&
local_state) {
+ RETURN_IF_ERROR(_sort_source->build_merger(state, _merger,
local_state.profile()));
+ std::vector<vectorized::BlockSupplier> child_block_suppliers;
+ for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
+ vectorized::BlockSupplier block_supplier = [&, id =
channel_id](vectorized::Block* block,
+ bool*
eos) {
+ vectorized::Block next_block;
+ if (_running_sink_operators == 0) {
+ if (_data_queue[id].try_dequeue(next_block)) {
+ block->swap(next_block);
+ if (_free_block_limit == 0 ||
+ _free_blocks.size_approx() < _free_block_limit *
_num_sources) {
+ _free_blocks.enqueue(std::move(next_block));
+ }
+ sub_mem_usage(local_state, id, block->allocated_bytes());
+ } else {
+ *eos = true;
+ }
+ } else if (_data_queue[id].try_dequeue(next_block)) {
+ block->swap(next_block);
+ if (_free_block_limit == 0 ||
+ _free_blocks.size_approx() < _free_block_limit *
_num_sources) {
+ _free_blocks.enqueue(std::move(next_block));
+ }
+ sub_mem_usage(local_state, id, block->allocated_bytes());
+ }
+ return Status::OK();
+ };
+ child_block_suppliers.push_back(block_supplier);
+ }
+ RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+ _merger->set_pipeline_engine_enabled(true);
+ return Status::OK();
+}
+
+/*
+before
+ sort(8) --> datasink(8) [0,7]. ---->
+ sort(8) --> datasink(8) [8,15]. ----> [0,23]global merge ---->
Exchange(1)
+ sort(8) --> datasink(8) [16,23].---->
+
+now
+
+ sort(8) --> local merge(1) ---> datasink(1) [0] ---->
+ sort(8) --> local merge(1) ---> datasink(1) [1] ----> [0,2]global
merge ----> Exchange(1)
+ sort(8) --> local merge(1) ---> datasink(1) [2] ---->
+*/
+Status LocalMergeSortExchanger::get_block(RuntimeState* state,
vectorized::Block* block, bool* eos,
+ LocalExchangeSourceLocalState&
local_state) {
+ if (local_state._channel_id != 0) {
+ *eos = true;
+ return Status::OK();
+ }
+ if (!_merger) {
+ RETURN_IF_ERROR(build_merger(state, local_state));
+ }
+ RETURN_IF_ERROR(_merger->get_next(block, eos));
+ return Status::OK();
+}
+
+void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState&
local_state,
+ int64_t delta) {
+ const auto channel_id = local_state._channel_id;
+ local_state._shared_state->mem_trackers[channel_id]->consume(delta);
+ if (_queues_mem_usege[channel_id].fetch_add(delta) > _each_queue_limit) {
+ _sink_deps[channel_id]->block();
+ }
+}
+
+void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState&
local_state,
+ int channel_id, int64_t delta) {
+ local_state._shared_state->mem_trackers[channel_id]->release(delta);
+ if (_queues_mem_usege[channel_id].fetch_sub(delta) <= _each_queue_limit) {
+ _sink_deps[channel_id]->set_ready();
+ }
+}
+
Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos,
LocalExchangeSinkLocalState& local_state) {
for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index bc07c806094..806ac8b9131 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -25,6 +25,7 @@ namespace doris::pipeline {
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
struct ShuffleBlockWrapper;
+class SortSourceOperatorX;
class Exchanger {
public:
@@ -50,6 +51,8 @@ public:
virtual ExchangeType get_type() const = 0;
virtual void close(LocalExchangeSourceLocalState& local_state) {}
+ virtual DependencySPtr get_local_state_dependency(int _channel_id) {
return nullptr; }
+
protected:
friend struct LocalExchangeSharedState;
friend struct ShuffleBlockWrapper;
@@ -177,6 +180,50 @@ private:
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
};
+class LocalMergeSortExchanger final : public Exchanger {
+public:
+ ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
+ LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
+ int running_sink_operators, int num_partitions,
int free_block_limit)
+ : Exchanger(running_sink_operators, num_partitions,
free_block_limit),
+ _sort_source(std::move(sort_source)),
+ _queues_mem_usege(num_partitions),
+ _each_queue_limit(config::local_exchange_buffer_mem_limit /
num_partitions) {
+ _data_queue.resize(num_partitions);
+ for (size_t i = 0; i < num_partitions; i++) {
+ _queues_mem_usege[i] = 0;
+ _sink_deps.push_back(
+ std::make_shared<Dependency>(0, 0,
"LOCAL_MERGE_SORT_DEPENDENCY", true));
+ }
+ }
+ ~LocalMergeSortExchanger() override = default;
+ Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
+ LocalExchangeSinkLocalState& local_state) override;
+
+ Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
+ LocalExchangeSourceLocalState& local_state) override;
+ ExchangeType get_type() const override { return
ExchangeType::LOCAL_MERGE_SORT; }
+
+ Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState&
local_state);
+
+ DependencySPtr get_local_state_dependency(int channel_id) override {
+ DCHECK(_sink_deps[channel_id]);
+ return _sink_deps[channel_id];
+ }
+
+ void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t
delta);
+
+ void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int
channel_id, int64_t delta);
+
+private:
+ std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
+ std::unique_ptr<vectorized::VSortedRunMerger> _merger;
+ std::shared_ptr<SortSourceOperatorX> _sort_source;
+ std::vector<DependencySPtr> _sink_deps;
+ std::vector<std::atomic_int64_t> _queues_mem_usege;
+ const int64_t _each_queue_limit;
+};
+
class BroadcastExchanger final : public Exchanger {
public:
ENABLE_FACTORY_CREATOR(BroadcastExchanger);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 9ddbd1b9150..a85b64c9154 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -37,6 +37,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/stream_load_pipe.h"
+#include "pipeline/dependency.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "pipeline/exec/analytic_sink_operator.h"
@@ -90,6 +91,7 @@
#include "pipeline/exec/union_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "pipeline/local_exchange/local_exchanger.h"
#include "pipeline/task_scheduler.h"
#include "pipeline_task.h"
#include "runtime/exec_env.h"
@@ -742,6 +744,21 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
?
_runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
break;
+ case ExchangeType::LOCAL_MERGE_SORT: {
+ auto child_op = cur_pipe->sink_x()->child_x();
+ auto sort_source =
std::dynamic_pointer_cast<SortSourceOperatorX>(child_op);
+ if (!sort_source) {
+ return Status::InternalError(
+ "LOCAL_MERGE_SORT must use in SortSourceOperatorX , but
now is {} ",
+ child_op->get_name());
+ }
+ shared_state->exchanger = LocalMergeSortExchanger::create_unique(
+ sort_source, cur_pipe->num_tasks(), _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
+ break;
+ }
case ExchangeType::ADAPTIVE_PASSTHROUGH:
shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 565e5ccdc20..2b303603e7a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -597,6 +597,11 @@ public:
return _query_options.__isset.enable_force_spill &&
_query_options.enable_force_spill;
}
+ bool enable_local_merge_sort() const {
+ return _query_options.__isset.enable_local_merge_sort &&
+ _query_options.enable_local_merge_sort;
+ }
+
int64_t min_revocable_mem() const {
if (_query_options.__isset.min_revocable_mem) {
return _query_options.min_revocable_mem;
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 8681dc4ffc6..d92718298cb 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -195,6 +195,9 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
}
bool has_next_block() override {
+ if (_is_eof) {
+ return false;
+ }
_block.clear();
Status status;
do {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c3d9ce287c2..ab412b7dd08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -252,6 +252,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FORCE_TO_LOCAL_SHUFFLE =
"force_to_local_shuffle";
+ public static final String ENABLE_LOCAL_MERGE_SORT =
"enable_local_merge_sort";
+
public static final String ENABLE_AGG_STATE = "enable_agg_state";
public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE =
"enable_bucket_shuffle_downgrade";
@@ -973,6 +975,9 @@ public class SessionVariable implements Serializable,
Writable {
"Whether to force to local shuffle on pipelineX
engine."})
private boolean forceToLocalShuffle = false;
+ @VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
+ private boolean enableLocalMergeSort = true;
+
@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType =
VariableAnnotation.EXPERIMENTAL,
needForward = true)
public boolean enableAggState = false;
@@ -3391,6 +3396,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setMinRevocableMem(minRevocableMem);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+ tResult.setEnableLocalMergeSort(enableLocalMergeSort);
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 586d40b8648..3619bf9d97d 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -299,6 +299,8 @@ struct TQueryOptions {
111: optional bool enable_orc_filter_by_min_max = true
112: optional i32 max_column_reader_num = 0
+
+ 113: optional bool enable_local_merge_sort = false;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]