This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new de8442bbef2 [refactor](pipeline) Refactor local exchange planning
(#42482)
de8442bbef2 is described below
commit de8442bbef2e4ee91d2815f8bf2bca8886bf3235
Author: Gabriel <[email protected]>
AuthorDate: Tue Oct 29 14:04:20 2024 +0800
[refactor](pipeline) Refactor local exchange planning (#42482)
---
be/src/pipeline/exec/aggregation_sink_operator.h | 1 -
be/src/pipeline/exec/analytic_sink_operator.h | 3 --
be/src/pipeline/exec/datagen_operator.cpp | 8 +--
.../exec/distinct_streaming_aggregation_operator.h | 3 --
be/src/pipeline/exec/exchange_source_operator.cpp | 4 +-
be/src/pipeline/exec/exchange_source_operator.h | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 7 +--
be/src/pipeline/exec/hashjoin_probe_operator.h | 3 --
.../exec/nested_loop_join_build_operator.h | 4 +-
be/src/pipeline/exec/operator.cpp | 10 +++-
be/src/pipeline/exec/operator.h | 21 +-------
.../exec/partitioned_aggregation_sink_operator.h | 3 --
.../exec/partitioned_hash_join_probe_operator.h | 3 --
.../exec/partitioned_hash_join_sink_operator.h | 3 --
be/src/pipeline/exec/scan_operator.cpp | 6 ++-
be/src/pipeline/exec/scan_operator.h | 4 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 2 -
be/src/pipeline/exec/set_sink_operator.h | 1 -
be/src/pipeline/exec/sort_sink_operator.h | 1 -
.../local_exchange_source_operator.h | 3 --
be/src/pipeline/local_exchange/local_exchanger.cpp | 2 +-
be/src/pipeline/local_exchange/local_exchanger.h | 11 ++--
be/src/pipeline/pipeline.cpp | 21 ++++----
be/src/pipeline/pipeline.h | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 62 +++++-----------------
be/src/pipeline/pipeline_fragment_context.h | 9 ++--
.../org/apache/doris/planner/PlanFragment.java | 1 -
.../java/org/apache/doris/qe/SessionVariable.java | 4 +-
gensrc/thrift/Planner.thrift | 4 --
.../distribute/local_shuffle.groovy | 2 +-
30 files changed, 64 insertions(+), 146 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 8271f1451b4..9ff3de99b22 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -152,7 +152,6 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
bool require_data_distribution() const override { return _is_colocate; }
- bool require_shuffled_data_distribution() const override { return
!_probe_expr_ctxs.empty(); }
size_t get_revocable_mem_size(RuntimeState* state) const;
AggregatedDataVariants* get_agg_data(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 1a0a671cf9f..b35354107f6 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -88,9 +88,6 @@ public:
}
bool require_data_distribution() const override { return true; }
- bool require_shuffled_data_distribution() const override {
- return !_partition_by_eq_expr_ctxs.empty();
- }
private:
Status _insert_range_column(vectorized::Block* block, const
vectorized::VExprContextSPtr& expr,
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index faa6359e874..965092b7eef 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -36,7 +36,9 @@ DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool*
pool, const TPlanNode
: OperatorX<DataGenLocalState>(pool, tnode, operator_id, descs),
_tuple_id(tnode.data_gen_scan_node.tuple_id),
_tuple_desc(nullptr),
- _runtime_filter_descs(tnode.runtime_filters) {}
+ _runtime_filter_descs(tnode.runtime_filters) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::init(tnode, state));
@@ -87,8 +89,8 @@ Status DataGenLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
// TODO: use runtime filter to filte result block, maybe this node need
derive from vscan_node.
for (const auto& filter_desc : p._runtime_filter_descs) {
std::shared_ptr<IRuntimeFilter> runtime_filter;
- RETURN_IF_ERROR(state->register_consumer_runtime_filter(
- filter_desc, p.ignore_data_distribution(), p.node_id(),
&runtime_filter));
+ RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc,
p.is_serial_operator(),
+ p.node_id(),
&runtime_filter));
runtime_filter->init_profile(_runtime_profile.get());
}
return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 1f7a21190ad..4c5fcd5efa7 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -116,9 +116,6 @@ public:
}
bool require_data_distribution() const override { return _is_colocate; }
- bool require_shuffled_data_distribution() const override {
- return _needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg);
- }
private:
friend class DistinctStreamingAggLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 844e6decd64..c9eebc5d2e4 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -105,7 +105,9 @@
ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
tnode.exchange_node.input_row_tuples.size())),
- _offset(tnode.exchange_node.__isset.offset ?
tnode.exchange_node.offset : 0) {}
+ _offset(tnode.exchange_node.__isset.offset ?
tnode.exchange_node.offset : 0) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 0fe3dcbb590..c8ef674d269 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -81,7 +81,7 @@ public:
[[nodiscard]] bool is_merging() const { return _is_merging; }
DataDistribution required_data_distribution() const override {
- if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
+ if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
return {ExchangeType::NOOP};
}
return _partition_type == TPartitionType::HASH_PARTITIONED
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 69aa6843b84..83755d7f730 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -130,8 +130,8 @@ public:
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
- return _child->ignore_data_distribution() ?
DataDistribution(ExchangeType::PASS_TO_ONE)
- :
DataDistribution(ExchangeType::NOOP);
+ return _child->is_serial_operator() ?
DataDistribution(ExchangeType::PASS_TO_ONE)
+ :
DataDistribution(ExchangeType::NOOP);
}
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution ==
TJoinDistributionType::COLOCATE
@@ -139,9 +139,6 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_broadcast_join;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 917c2692b44..7da7a3b238d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -152,9 +152,6 @@ public:
:
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_broadcast_join;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index f2ca259754b..d6e72799f97 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -76,8 +76,8 @@ public:
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
- return _child->ignore_data_distribution() ?
DataDistribution(ExchangeType::BROADCAST)
- :
DataDistribution(ExchangeType::NOOP);
+ return _child->is_serial_operator() ?
DataDistribution(ExchangeType::BROADCAST)
+ :
DataDistribution(ExchangeType::NOOP);
}
private:
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 6e3099db748..fb2dd828c39 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -74,6 +74,7 @@
#include "pipeline/exec/union_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "pipeline/pipeline.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
@@ -116,11 +117,16 @@ std::string
PipelineXSinkLocalState<SharedStateArg>::name_suffix() {
}() + ")";
}
-DataDistribution DataSinkOperatorXBase::required_data_distribution() const {
- return _child && _child->ignore_data_distribution()
+DataDistribution OperatorBase::required_data_distribution() const {
+ return _child && _child->is_serial_operator() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
+
+bool OperatorBase::require_shuffled_data_distribution() const {
+ return
Pipeline::is_hash_exchange(required_data_distribution().distribution_type);
+}
+
const RowDescriptor& OperatorBase::row_desc() const {
return _child->row_desc();
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 5df0a19498f..2a2b3fdd3b9 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -118,7 +118,8 @@ public:
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
- [[nodiscard]] virtual bool require_shuffled_data_distribution() const {
return false; }
+ [[nodiscard]] virtual DataDistribution required_data_distribution() const;
+ [[nodiscard]] virtual bool require_shuffled_data_distribution() const;
protected:
OperatorPtr _child = nullptr;
@@ -483,7 +484,6 @@ public:
}
[[nodiscard]] virtual std::shared_ptr<BasicSharedState>
create_shared_state() const = 0;
- [[nodiscard]] virtual DataDistribution required_data_distribution() const;
Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
@@ -496,8 +496,6 @@ public:
[[nodiscard]] bool is_sink() const override { return true; }
- [[nodiscard]] bool is_source() const override { return false; }
-
static Status close(RuntimeState* state, Status exec_status) {
auto result = state->get_sink_local_state_result();
if (!result) {
@@ -652,19 +650,7 @@ public:
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
- [[nodiscard]] virtual DataDistribution required_data_distribution() const {
- return _child && _child->ignore_data_distribution() && !is_source()
- ? DataDistribution(ExchangeType::PASSTHROUGH)
- : DataDistribution(ExchangeType::NOOP);
- }
- [[nodiscard]] virtual bool ignore_data_distribution() const {
- return _child ? _child->ignore_data_distribution() :
_ignore_data_distribution;
- }
- [[nodiscard]] bool ignore_data_hash_distribution() const {
- return _child ? _child->ignore_data_hash_distribution() :
_ignore_data_distribution;
- }
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const
{ return true; }
- void set_ignore_data_distribution() { _ignore_data_distribution = true; }
Status open(RuntimeState* state) override;
@@ -735,8 +721,6 @@ public:
bool has_output_row_desc() const { return _output_row_descriptor !=
nullptr; }
- [[nodiscard]] bool is_source() const override { return false; }
-
[[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
vectorized::Block*
block, bool* eos);
@@ -779,7 +763,6 @@ protected:
uint32_t _debug_point_count = 0;
std::string _op_name;
- bool _ignore_data_distribution = false;
int _parallel_tasks = 0;
//_keep_origin is used to avoid copying during projection,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 6b3a74c83df..15f6b22387a 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -309,9 +309,6 @@ public:
bool require_data_distribution() const override {
return _agg_sink_operator->require_data_distribution();
}
- bool require_shuffled_data_distribution() const override {
- return _agg_sink_operator->require_shuffled_data_distribution();
- }
Status set_child(OperatorPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3aab11f62d8..f8fc0780b6f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -165,9 +165,6 @@ public:
_distribution_partition_exprs));
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index c768d7518b9..8e89763b50a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -115,9 +115,6 @@ public:
_distribution_partition_exprs);
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 4f3c97bab71..be940e8c89c 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -73,7 +73,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<typename Derived::Parent>();
- RETURN_IF_ERROR(RuntimeFilterConsumer::init(state,
p.ignore_data_distribution()));
+ RETURN_IF_ERROR(RuntimeFilterConsumer::init(state,
p.is_serial_operator()));
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
init_runtime_filter_dependency(_filter_dependencies, p.operator_id(),
p.node_id(),
@@ -990,7 +990,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- _scan_dependency, p.ignore_data_distribution());
+ _scan_dependency, p.is_serial_operator());
return Status::OK();
}
@@ -1145,6 +1145,8 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool*
pool, const TPlanNode&
: OperatorX<LocalStateType>(pool, tnode, operator_id, descs),
_runtime_filter_descs(tnode.runtime_filters),
_parallel_tasks(parallel_tasks) {
+ OperatorX<LocalStateType>::_is_serial_operator =
+ tnode.__isset.is_serial_operator && tnode.is_serial_operator;
if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index bf650cb8495..e4f8a828c6e 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -383,8 +383,8 @@ public:
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
DataDistribution required_data_distribution() const override {
- if (OperatorX<LocalStateType>::ignore_data_distribution()) {
- // `ignore_data_distribution()` returns true means we ignore the
distribution.
+ if (OperatorX<LocalStateType>::is_serial_operator()) {
+ // `is_serial_operator()` returns true means we ignore the
distribution.
return {ExchangeType::NOOP};
}
return {ExchangeType::BUCKET_HASH_SHUFFLE};
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index ab53f5358c2..f320c8e89cd 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -96,8 +96,6 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- bool require_shuffled_data_distribution() const override { return true; }
-
std::shared_ptr<BasicSharedState> create_shared_state() const override {
return nullptr; }
private:
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 65c33795e5d..8e3c264f267 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -94,7 +94,6 @@ public:
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- bool require_shuffled_data_distribution() const override { return true; }
private:
template <class HashTableContext, bool is_intersected>
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index 0829c38b40f..a5a24e37163 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -73,7 +73,6 @@ public:
return {ExchangeType::NOOP};
}
}
- bool require_shuffled_data_distribution() const override { return
_is_analytic_sort; }
bool require_data_distribution() const override { return _is_colocate; }
size_t get_revocable_mem_size(RuntimeState* state) const;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index c0da5c8120c..3c706d50182 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -81,9 +81,6 @@ public:
bool is_source() const override { return true; }
- // If input data distribution is ignored by this fragment, this first
local exchange source in this fragment will re-assign all data.
- bool ignore_data_distribution() const override { return false; }
-
private:
friend class LocalExchangeSourceLocalState;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index da27a39772d..c5f99ca5d6a 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -226,7 +226,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
new_block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
}
}
- } else if (_num_senders != _num_sources ||
_ignore_source_data_distribution) {
+ } else if (_num_senders != _num_sources) {
// In this branch, data just should be distributed equally into all
instances.
new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index b3731638cb3..bf052ac3b92 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -218,24 +218,21 @@ public:
protected:
ShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
- bool ignore_source_data_distribution, int
free_block_limit)
+ int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_sources,
num_partitions,
- free_block_limit),
-
_ignore_source_data_distribution(ignore_source_data_distribution) {
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, LocalExchangeSinkLocalState&
local_state);
-
- const bool _ignore_source_data_distribution = false;
};
class BucketShuffleExchanger final : public ShuffleExchanger {
ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
BucketShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
- bool ignore_source_data_distribution, int
free_block_limit)
+ int free_block_limit)
: ShuffleExchanger(running_sink_operators, num_sources,
num_partitions,
- ignore_source_data_distribution,
free_block_limit) {}
+ free_block_limit) {}
~BucketShuffleExchanger() override = default;
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
};
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 5b93fbdf1f8..96da754daa5 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -39,6 +39,7 @@ bool Pipeline::need_to_local_exchange(const DataDistribution
target_data_distrib
[&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
return false;
}
+ // If all operators are serial and sink is not serial, we should improve
parallelism for sink.
if (std::all_of(_operators.begin(), _operators.end(),
[&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
if (!_sink->is_serial_operator()) {
@@ -46,21 +47,22 @@ bool Pipeline::need_to_local_exchange(const
DataDistribution target_data_distrib
}
} else if (std::any_of(_operators.begin(), _operators.end(),
[&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ // If non-serial operators exist, we should improve parallelism for
those.
return true;
}
if (target_data_distribution.distribution_type !=
ExchangeType::BUCKET_HASH_SHUFFLE &&
target_data_distribution.distribution_type !=
ExchangeType::HASH_SHUFFLE) {
+ // Always do local exchange if non-hash-partition exchanger is
required.
+ // For example, `PASSTHROUGH` exchanger is always required to
distribute data evenly.
return true;
- } else if (_operators.front()->ignore_data_hash_distribution()) {
- if (_data_distribution.distribution_type ==
target_data_distribution.distribution_type &&
- (_data_distribution.partition_exprs.empty() ||
- target_data_distribution.partition_exprs.empty())) {
- return true;
- }
- return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
- !(is_hash_exchange(_data_distribution.distribution_type) &&
- is_hash_exchange(target_data_distribution.distribution_type));
+ } else if (_operators.front()->is_serial_operator()) {
+ DCHECK(std::all_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); }) &&
+ _sink->is_serial_operator())
+ << debug_string();
+ // All operators and sink are serial in this path.
+ return false;
} else {
return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
!(is_hash_exchange(_data_distribution.distribution_type) &&
@@ -71,7 +73,6 @@ bool Pipeline::need_to_local_exchange(const DataDistribution
target_data_distrib
Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) {
if (parallelism > 0 && op->is_serial_operator()) {
set_num_tasks(parallelism);
- op->set_ignore_data_distribution();
}
op->set_parallel_tasks(num_tasks());
_operators.emplace_back(op);
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 9554537ca16..98e52ec5271 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -115,7 +115,7 @@ public:
int num_tasks() const { return _num_tasks; }
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
- std::string debug_string() {
+ std::string debug_string() const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"Pipeline [id: {}, _num_tasks: {}, _num_tasks_created:
{}]", _pipeline_id,
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index ef856da5135..bd45016adf5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -236,8 +236,6 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
if (request.__isset.query_options &&
request.query_options.__isset.execution_timeout) {
_timeout = request.query_options.execution_timeout;
}
- _use_serial_source =
- request.fragment.__isset.use_serial_source &&
request.fragment.use_serial_source;
_fragment_level_profile =
std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
@@ -704,6 +702,9 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
require_shuffled_data_distribution;
+ if (num_children == 0) {
+ _use_serial_source = op->is_serial_operator();
+ }
// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
@@ -736,8 +737,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
DataDistribution data_distribution, bool* do_local_exchange, int
num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
- const std::map<int, int>& shuffle_idx_to_instance_idx,
- const bool ignore_data_hash_distribution) {
+ const std::map<int, int>& shuffle_idx_to_instance_idx) {
auto& operators = cur_pipe->operators();
const auto downstream_pipeline_id = cur_pipe->id();
auto local_exchange_id = next_operator_id();
@@ -785,7 +785,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger = BucketShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
_num_instances, num_buckets,
- ignore_data_hash_distribution,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
? cast_set<int>(
_runtime_state->query_options().local_exchange_free_blocks_limit)
@@ -922,8 +921,7 @@ Status PipelineFragmentContext::_add_local_exchange(
int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr
cur_pipe,
DataDistribution data_distribution, bool* do_local_exchange, int
num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
- const std::map<int, int>& shuffle_idx_to_instance_idx,
- const bool ignore_data_distribution) {
+ const std::map<int, int>& shuffle_idx_to_instance_idx) {
if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
return Status::OK();
}
@@ -938,7 +936,7 @@ Status PipelineFragmentContext::_add_local_exchange(
auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
RETURN_IF_ERROR(_add_local_exchange_impl(
idx, pool, cur_pipe, new_pip, data_distribution,
do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
ignore_data_distribution));
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
CHECK(total_op_num + 1 == cur_pipe->operators().size() +
new_pip->operators().size())
<< "total_op_num: " << total_op_num
@@ -952,7 +950,7 @@ Status PipelineFragmentContext::_add_local_exchange(
cast_set<int>(new_pip->operators().size()), pool, new_pip,
add_pipeline(new_pip, pip_idx + 2),
DataDistribution(ExchangeType::PASSTHROUGH),
do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
- shuffle_idx_to_instance_idx, ignore_data_distribution));
+ shuffle_idx_to_instance_idx));
}
return Status::OK();
}
@@ -978,13 +976,8 @@ Status PipelineFragmentContext::_plan_local_exchange(
// scan node. so here use `_num_instance` to replace the `num_buckets`
to prevent dividing 0
// still keep colocate plan after local shuffle
RETURN_IF_ERROR(_plan_local_exchange(
-
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution() ||
- num_buckets == 0
- ? _num_instances
- : num_buckets,
- pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx,
- shuffle_idx_to_instance_idx,
-
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution()));
+ _use_serial_source || num_buckets == 0 ? _num_instances :
num_buckets, pip_idx,
+ _pipelines[pip_idx], bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx));
}
return Status::OK();
}
@@ -992,8 +985,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
Status PipelineFragmentContext::_plan_local_exchange(
int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx,
- const std::map<int, int>& shuffle_idx_to_instance_idx,
- const bool ignore_data_hash_distribution) {
+ const std::map<int, int>& shuffle_idx_to_instance_idx) {
int idx = 1;
bool do_local_exchange = false;
do {
@@ -1005,8 +997,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, ops[idx]->node_id(),
_runtime_state->obj_pool(), pip,
ops[idx]->required_data_distribution(),
&do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx,
- ignore_data_hash_distribution));
+ bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx));
}
if (do_local_exchange) {
// If local exchange is needed for current operator, we will
split this pipeline to
@@ -1023,8 +1014,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, pip->sink()->node_id(),
_runtime_state->obj_pool(), pip,
pip->sink()->required_data_distribution(), &do_local_exchange,
num_buckets,
- bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
- ignore_data_hash_distribution));
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
}
return Status::OK();
}
@@ -1215,10 +1205,6 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
enable_query_cache ? request.fragment.query_cache_param :
TQueryCacheParam {}));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
break;
}
case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
@@ -1229,10 +1215,6 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(),
descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
break;
}
case doris::TPlanNodeType::JDBC_SCAN_NODE: {
@@ -1245,20 +1227,12 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
"Jdbc scan node is disabled, you can change be config
enable_java_support "
"to true and restart be.");
}
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
break;
}
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
break;
}
case TPlanNodeType::ES_SCAN_NODE:
@@ -1266,10 +1240,6 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
@@ -1278,10 +1248,6 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(),
descs, num_senders));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
- if (request.__isset.parallel_instances) {
- op->set_ignore_data_distribution();
- cur_pipe->set_num_tasks(request.parallel_instances);
- }
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
@@ -1643,10 +1609,6 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(),
descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
break;
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 6caa0e5c106..289f5c82365 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -153,22 +153,19 @@ private:
const std::map<int, int>&
shuffle_idx_to_instance_idx);
Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>&
bucket_seq_to_instance_idx,
- const std::map<int, int>&
shuffle_idx_to_instance_idx,
- const bool ignore_data_distribution);
+ const std::map<int, int>&
shuffle_idx_to_instance_idx);
void _inherit_pipeline_properties(const DataDistribution&
data_distribution,
PipelinePtr pipe_with_source,
PipelinePtr pipe_with_sink);
Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool*
pool,
PipelinePtr cur_pipe, DataDistribution
data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>&
bucket_seq_to_instance_idx,
- const std::map<int, int>&
shuffle_idx_to_instance_idx,
- const bool ignore_data_distribution);
+ const std::map<int, int>&
shuffle_idx_to_instance_idx);
Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr
cur_pipe,
PipelinePtr new_pip, DataDistribution
data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>&
bucket_seq_to_instance_idx,
- const std::map<int, int>&
shuffle_idx_to_instance_idx,
- const bool ignore_data_hash_distribution);
+ const std::map<int, int>&
shuffle_idx_to_instance_idx);
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index c5a6ec55f63..0ebd023ed41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -342,7 +342,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// TODO chenhao , calculated by cost
result.setMinReservationBytes(0);
result.setInitialReservationTotalClaims(0);
- result.setUseSerialSource(useSerialSource(ConnectContext.get()));
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0c755b9aae9..52ea334a142 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4351,7 +4351,7 @@ public class SessionVariable implements Serializable,
Writable {
}
public boolean isIgnoreStorageDataDistribution() {
- return ignoreStorageDataDistribution && enableLocalShuffle;
+ return ignoreStorageDataDistribution && enableLocalShuffle &&
enableNereidsPlanner;
}
public void setIgnoreStorageDataDistribution(boolean
ignoreStorageDataDistribution) {
@@ -4389,7 +4389,7 @@ public class SessionVariable implements Serializable,
Writable {
}
public boolean isForceToLocalShuffle() {
- return enableLocalShuffle && forceToLocalShuffle;
+ return enableLocalShuffle && forceToLocalShuffle &&
enableNereidsPlanner;
}
public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift
index ffcc33638db..866d8d45320 100644
--- a/gensrc/thrift/Planner.thrift
+++ b/gensrc/thrift/Planner.thrift
@@ -64,10 +64,6 @@ struct TPlanFragment {
8: optional i64 initial_reservation_total_claims
9: optional QueryCache.TQueryCacheParam query_cache_param
-
- // Using serial source means a serial source operator will be used in this
fragment (e.g. data will be shuffled to
- // only 1 exchange operator) and then splitted by followed local exchanger
- 10: optional bool use_serial_source
}
// location information for a single scan range
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
index 950b6171c7c..d701ad890d6 100644
--- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
+++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
@@ -45,7 +45,7 @@ suite("local_shuffle") {
insert into test_local_shuffle1 values (1, 1), (2, 2);
insert into test_local_shuffle2 values (2, 2), (3, 3);
- set enable_nereids_distribute_planner=true;
+ // set enable_nereids_distribute_planner=true;
set enable_pipeline_x_engine=true;
set disable_join_reorder=true;
set enable_local_shuffle=true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]