This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7769c68d682 [branch-3.0](pick) Pick #41789 #42482 #41210 #42460
(#42914)
7769c68d682 is described below
commit 7769c68d6829170b560a77f3e536b0e6978d4a58
Author: Gabriel <[email protected]>
AuthorDate: Thu Oct 31 17:31:41 2024 +0800
[branch-3.0](pick) Pick #41789 #42482 #41210 #42460 (#42914)
---
be/src/pipeline/dependency.h | 8 +-
be/src/pipeline/exec/aggregation_sink_operator.cpp | 5 +-
be/src/pipeline/exec/aggregation_sink_operator.h | 8 +-
.../pipeline/exec/aggregation_source_operator.cpp | 4 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 4 +-
be/src/pipeline/exec/analytic_sink_operator.h | 3 -
be/src/pipeline/exec/analytic_source_operator.cpp | 1 +
be/src/pipeline/exec/assert_num_rows_operator.cpp | 1 +
be/src/pipeline/exec/datagen_operator.cpp | 8 +-
.../distinct_streaming_aggregation_operator.cpp | 4 +-
.../exec/distinct_streaming_aggregation_operator.h | 7 +-
be/src/pipeline/exec/exchange_sink_operator.h | 1 +
be/src/pipeline/exec/exchange_source_operator.cpp | 4 +-
be/src/pipeline/exec/exchange_source_operator.h | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 7 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 3 -
be/src/pipeline/exec/join_build_sink_operator.cpp | 2 +
be/src/pipeline/exec/join_probe_operator.cpp | 1 +
.../exec/nested_loop_join_build_operator.h | 4 +-
.../exec/nested_loop_join_probe_operator.h | 4 +-
be/src/pipeline/exec/operator.cpp | 19 +-
be/src/pipeline/exec/operator.h | 27 +-
.../exec/partitioned_aggregation_sink_operator.h | 3 -
.../partitioned_aggregation_source_operator.cpp | 4 +
.../exec/partitioned_aggregation_source_operator.h | 2 +
.../exec/partitioned_hash_join_probe_operator.h | 3 -
.../exec/partitioned_hash_join_sink_operator.h | 3 -
be/src/pipeline/exec/scan_operator.cpp | 6 +-
be/src/pipeline/exec/scan_operator.h | 4 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 2 -
be/src/pipeline/exec/set_sink_operator.h | 1 -
be/src/pipeline/exec/sort_sink_operator.cpp | 4 +-
be/src/pipeline/exec/sort_sink_operator.h | 4 +-
be/src/pipeline/exec/sort_source_operator.cpp | 4 +-
be/src/pipeline/exec/union_source_operator.h | 4 +-
.../local_exchange_sink_operator.cpp | 6 +-
.../local_exchange/local_exchange_sink_operator.h | 2 +-
.../local_exchange_source_operator.h | 3 -
be/src/pipeline/local_exchange/local_exchanger.cpp | 2 +-
be/src/pipeline/local_exchange/local_exchanger.h | 11 +-
be/src/pipeline/pipeline.cpp | 44 ++-
be/src/pipeline/pipeline.h | 46 ++-
be/src/pipeline/pipeline_fragment_context.cpp | 322 ++++++++++-----------
be/src/pipeline/pipeline_fragment_context.h | 38 ++-
.../aggregate_function_window.h | 7 +-
.../org/apache/doris/planner/AggregationNode.java | 6 +
.../org/apache/doris/planner/AnalyticEvalNode.java | 11 +
.../apache/doris/planner/AssertNumRowsNode.java | 5 +
.../org/apache/doris/planner/DataPartition.java | 4 +
.../org/apache/doris/planner/EmptySetNode.java | 1 -
.../org/apache/doris/planner/ExchangeNode.java | 29 ++
.../org/apache/doris/planner/JoinNodeBase.java | 1 -
.../apache/doris/planner/NestedLoopJoinNode.java | 15 +
.../org/apache/doris/planner/PlanFragment.java | 20 ++
.../java/org/apache/doris/planner/PlanNode.java | 14 +
.../java/org/apache/doris/planner/RepeatNode.java | 6 +
.../java/org/apache/doris/planner/ScanNode.java | 7 +
.../java/org/apache/doris/planner/SelectNode.java | 6 +
.../java/org/apache/doris/planner/SortNode.java | 6 +
.../java/org/apache/doris/planner/UnionNode.java | 7 +
.../main/java/org/apache/doris/qe/Coordinator.java | 37 ++-
.../java/org/apache/doris/qe/SessionVariable.java | 4 +-
gensrc/thrift/PlanNodes.thrift | 1 +
.../insert_into_table/complex_insert.groovy | 6 +-
.../distribute/local_shuffle.groovy | 14 +-
65 files changed, 505 insertions(+), 347 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 9364170898d..619dd2d2aa3 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -110,19 +110,19 @@ public:
// Notify downstream pipeline tasks this dependency is ready.
void set_ready();
void set_ready_to_read() {
- DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->set_ready();
}
void set_block_to_read() {
- DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->block();
}
void set_ready_to_write() {
- DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
_shared_state->sink_deps.front()->set_ready();
}
void set_block_to_write() {
- DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
_shared_state->sink_deps.front()->block();
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 260a599a947..f7585e50422 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -725,7 +725,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int
operator_id, const TPla
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
- _agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples) {}
+ _agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples),
+ _without_key(tnode.agg_node.grouping_exprs.empty()) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7c146c38a2b..0b78ab15d2a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,9 +143,8 @@ public:
DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
- return _needs_finalize ||
DataSinkOperatorX<AggSinkLocalState>::_child
- ->ignore_data_distribution()
- ? DataDistribution(ExchangeType::PASSTHROUGH)
+ return _needs_finalize
+ ? DataDistribution(ExchangeType::NOOP)
:
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
@@ -153,7 +152,6 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
bool require_data_distribution() const override { return _is_colocate; }
- bool require_shuffled_data_distribution() const override { return
!_probe_expr_ctxs.empty(); }
size_t get_revocable_mem_size(RuntimeState* state) const;
AggregatedDataVariants* get_agg_data(RuntimeState* state) {
@@ -204,8 +202,8 @@ protected:
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;
-
RowDescriptor _agg_fn_output_row_descriptor;
+ const bool _without_key;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index fe03eba4102..9cc35923d08 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -440,7 +440,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_needs_finalize(tnode.agg_node.need_finalize),
- _without_key(tnode.agg_node.grouping_exprs.empty()) {}
+ _without_key(tnode.agg_node.grouping_exprs.empty()) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 85d7773bdbd..11652db0a85 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool*
pool, int operator_id,
_require_bucket_distribution(require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
? tnode.distribute_expr_lists[0]
- : tnode.analytic_node.partition_exprs) {}
+ : tnode.analytic_node.partition_exprs) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index ee305a877f5..7ef650c4383 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -88,9 +88,6 @@ public:
}
bool require_data_distribution() const override { return true; }
- bool require_shuffled_data_distribution() const override {
- return !_partition_by_eq_expr_ctxs.empty();
- }
private:
Status _insert_range_column(vectorized::Block* block, const
vectorized::VExprContextSPtr& expr,
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index b9e48727656..866e5b8119e 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -475,6 +475,7 @@
AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo
_has_range_window(tnode.analytic_node.window.type ==
TAnalyticWindowType::RANGE),
_has_window_start(tnode.analytic_node.window.__isset.window_start),
_has_window_end(tnode.analytic_node.window.__isset.window_end) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
_fn_scope = AnalyticFnScope::PARTITION;
if (tnode.analytic_node.__isset.window &&
tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 5aa27b51c45..563c4bf49ca 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool*
pool, const TPlanNode
: StreamingOperatorX<AssertNumRowsLocalState>(pool, tnode,
operator_id, descs),
_desired_num_rows(tnode.assert_num_rows_node.desired_num_rows),
_subquery_string(tnode.assert_num_rows_node.subquery_string) {
+ _is_serial_operator = true;
if (tnode.assert_num_rows_node.__isset.assertion) {
_assertion = tnode.assert_num_rows_node.assertion;
} else {
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 93b3d058154..ac86db03d19 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -36,7 +36,9 @@ DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool*
pool, const TPlanNode
: OperatorX<DataGenLocalState>(pool, tnode, operator_id, descs),
_tuple_id(tnode.data_gen_scan_node.tuple_id),
_tuple_desc(nullptr),
- _runtime_filter_descs(tnode.runtime_filters) {}
+ _runtime_filter_descs(tnode.runtime_filters) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::init(tnode, state));
@@ -87,8 +89,8 @@ Status DataGenLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
// TODO: use runtime filter to filte result block, maybe this node need
derive from vscan_node.
for (const auto& filter_desc : p._runtime_filter_descs) {
std::shared_ptr<IRuntimeFilter> runtime_filter;
- RETURN_IF_ERROR(state->register_consumer_runtime_filter(
- filter_desc, p.ignore_data_distribution(), p.node_id(),
&runtime_filter));
+ RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc,
p.is_serial_operator(),
+ p.node_id(),
&runtime_filter));
runtime_filter->init_profile(_runtime_profile.get());
}
return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 70b73225f06..f61602f576b 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -334,7 +334,9 @@
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
- _require_bucket_distribution(require_bucket_distribution) {
+ _require_bucket_distribution(require_bucket_distribution),
+ _without_key(tnode.agg_node.grouping_exprs.empty()) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index edeb4321763..97df1a6fcbe 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -104,6 +104,9 @@ public:
bool need_more_input_data(RuntimeState* state) const override;
DataDistribution required_data_distribution() const override {
+ if (_needs_finalize && _probe_expr_ctxs.empty()) {
+ return {ExchangeType::NOOP};
+ }
if (_needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
@@ -113,9 +116,6 @@ public:
}
bool require_data_distribution() const override { return _is_colocate; }
- bool require_shuffled_data_distribution() const override {
- return _needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg);
- }
private:
friend class DistinctStreamingAggLocalState;
@@ -136,6 +136,7 @@ private:
/// The total size of the row from the aggregate functions.
size_t _total_size_of_aggregate_states = 0;
bool _is_streaming_preagg = false;
+ const bool _without_key;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index c60cefabfa8..3bd905fdde6 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -221,6 +221,7 @@ public:
Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block*
src, PBlock* dest,
int num_receivers = 1);
DataDistribution required_data_distribution() const override;
+ bool is_serial_operator() const override { return true; }
private:
friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index cf2055ec47b..0f72b197a51 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -104,7 +104,9 @@
ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
tnode.exchange_node.input_row_tuples.size())),
- _offset(tnode.exchange_node.__isset.offset ?
tnode.exchange_node.offset : 0) {}
+ _offset(tnode.exchange_node.__isset.offset ?
tnode.exchange_node.offset : 0) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 0fe3dcbb590..c8ef674d269 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -81,7 +81,7 @@ public:
[[nodiscard]] bool is_merging() const { return _is_merging; }
DataDistribution required_data_distribution() const override {
- if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
+ if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
return {ExchangeType::NOOP};
}
return _partition_type == TPartitionType::HASH_PARTITIONED
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 1ae9d5ae1a7..c56fb9bc9b1 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -132,8 +132,8 @@ public:
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
- return _child->ignore_data_distribution() ?
DataDistribution(ExchangeType::PASS_TO_ONE)
- :
DataDistribution(ExchangeType::NOOP);
+ return _child->is_serial_operator() ?
DataDistribution(ExchangeType::PASS_TO_ONE)
+ :
DataDistribution(ExchangeType::NOOP);
}
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution ==
TJoinDistributionType::COLOCATE
@@ -141,9 +141,6 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_broadcast_join;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index dde9c00dfe4..ad7e0d284cb 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -152,9 +152,6 @@ public:
:
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_broadcast_join;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index 2439dbc8fe1..a6f5f0f650a 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -82,6 +82,8 @@
JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
_short_circuit_for_null_in_build_side(_join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join),
_runtime_filter_descs(tnode.runtime_filters) {
+ DataSinkOperatorX<LocalStateType>::_is_serial_operator =
+ tnode.__isset.is_serial_operator && tnode.is_serial_operator;
_init_join_op();
if (_is_mark_join) {
DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp
b/be/src/pipeline/exec/join_probe_operator.cpp
index 05c62544d2b..53dcb2c4cfc 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -220,6 +220,7 @@
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
: true)
) {
+ Base::_is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
if (tnode.__isset.hash_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index f2ca259754b..d6e72799f97 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -76,8 +76,8 @@ public:
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
- return _child->ignore_data_distribution() ?
DataDistribution(ExchangeType::BROADCAST)
- :
DataDistribution(ExchangeType::NOOP);
+ return _child->is_serial_operator() ?
DataDistribution(ExchangeType::BROADCAST)
+ :
DataDistribution(ExchangeType::NOOP);
}
private:
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index f46a99306a5..982498c7e2e 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -197,7 +197,9 @@ public:
}
DataDistribution required_data_distribution() const override {
- if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::RIGHT_ANTI_JOIN ||
+ _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN) {
return {ExchangeType::NOOP};
}
return {ExchangeType::ADAPTIVE_PASSTHROUGH};
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 4a93bac67fe..68ddd250019 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -74,6 +74,7 @@
#include "pipeline/exec/union_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "pipeline/pipeline.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
@@ -116,11 +117,16 @@ std::string
PipelineXSinkLocalState<SharedStateArg>::name_suffix() {
}() + ")";
}
-DataDistribution DataSinkOperatorXBase::required_data_distribution() const {
- return _child && _child->ignore_data_distribution()
+DataDistribution OperatorBase::required_data_distribution() const {
+ return _child && _child->is_serial_operator() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
+
+bool OperatorBase::require_shuffled_data_distribution() const {
+ return
Pipeline::is_hash_exchange(required_data_distribution().distribution_type);
+}
+
const RowDescriptor& OperatorBase::row_desc() const {
return _child->row_desc();
}
@@ -141,8 +147,9 @@ std::string
PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentatio
std::string OperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}",
- std::string(indentation_level * 2, ' '), _op_name,
node_id(), _parallel_tasks);
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={},
_is_serial_operator={}",
+ std::string(indentation_level * 2, ' '), _op_name,
node_id(), _parallel_tasks,
+ _is_serial_operator);
return fmt::to_string(debug_string_buffer);
}
@@ -354,8 +361,8 @@ void
PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}",
std::string(indentation_level * 2, ' '),
- _name, node_id());
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}",
+ std::string(indentation_level * 2, ' '), _name, node_id(),
_is_serial_operator);
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index b848aea6e1e..38cee083e2e 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -101,6 +101,9 @@ public:
return Status::OK();
}
+ // Operators need to be executed serially. (e.g. finalized agg without key)
+ [[nodiscard]] virtual bool is_serial_operator() const { return
_is_serial_operator; }
+
[[nodiscard]] bool is_closed() const { return _is_closed; }
virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
@@ -115,13 +118,15 @@ public:
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
- [[nodiscard]] virtual bool require_shuffled_data_distribution() const {
return false; }
+ [[nodiscard]] virtual DataDistribution required_data_distribution() const;
+ [[nodiscard]] virtual bool require_shuffled_data_distribution() const;
protected:
OperatorPtr _child = nullptr;
bool _is_closed;
bool _followed_by_shuffled_operator = false;
+ bool _is_serial_operator = false;
};
class PipelineXLocalStateBase {
@@ -443,7 +448,7 @@ public:
Status init(const TDataSink& tsink) override;
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
- const bool is_shuffled_hash_join,
+ const bool use_global_hash_shuffle,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
return Status::InternalError("init() is only implemented in local
exchange!");
}
@@ -478,7 +483,6 @@ public:
}
[[nodiscard]] virtual std::shared_ptr<BasicSharedState>
create_shared_state() const = 0;
- [[nodiscard]] virtual DataDistribution required_data_distribution() const;
Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
@@ -491,8 +495,6 @@ public:
[[nodiscard]] bool is_sink() const override { return true; }
- [[nodiscard]] bool is_source() const override { return false; }
-
static Status close(RuntimeState* state, Status exec_status) {
auto result = state->get_sink_local_state_result();
if (!result) {
@@ -647,19 +649,7 @@ public:
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
- [[nodiscard]] virtual DataDistribution required_data_distribution() const {
- return _child && _child->ignore_data_distribution() && !is_source()
- ? DataDistribution(ExchangeType::PASSTHROUGH)
- : DataDistribution(ExchangeType::NOOP);
- }
- [[nodiscard]] virtual bool ignore_data_distribution() const {
- return _child ? _child->ignore_data_distribution() :
_ignore_data_distribution;
- }
- [[nodiscard]] bool ignore_data_hash_distribution() const {
- return _child ? _child->ignore_data_hash_distribution() :
_ignore_data_distribution;
- }
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const
{ return true; }
- void set_ignore_data_distribution() { _ignore_data_distribution = true; }
Status open(RuntimeState* state) override;
@@ -730,8 +720,6 @@ public:
bool has_output_row_desc() const { return _output_row_descriptor !=
nullptr; }
- [[nodiscard]] bool is_source() const override { return false; }
-
[[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
vectorized::Block*
block, bool* eos);
@@ -774,7 +762,6 @@ protected:
uint32_t _debug_point_count = 0;
std::string _op_name;
- bool _ignore_data_distribution = false;
int _parallel_tasks = 0;
//_keep_origin is used to avoid copying during projection,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 6b3a74c83df..15f6b22387a 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -309,9 +309,6 @@ public:
bool require_data_distribution() const override {
return _agg_sink_operator->require_data_distribution();
}
- bool require_shuffled_data_distribution() const override {
- return _agg_sink_operator->require_shuffled_data_distribution();
- }
Status set_child(OperatorPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 48df5587198..655a6e19725 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState*
state) {
return _agg_source_operator->close(state);
}
+bool PartitionedAggSourceOperatorX::is_serial_operator() const {
+ return _agg_source_operator->is_serial_operator();
+}
+
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index edae99c716a..7e73241745e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -91,6 +91,8 @@ public:
bool is_source() const override { return true; }
+ bool is_serial_operator() const override;
+
private:
friend class PartitionedAggLocalState;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3aab11f62d8..f8fc0780b6f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -165,9 +165,6 @@ public:
_distribution_partition_exprs));
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index c768d7518b9..8e89763b50a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -115,9 +115,6 @@ public:
_distribution_partition_exprs);
}
- bool require_shuffled_data_distribution() const override {
- return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
- }
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 88ac16f9740..cf723cd8732 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -70,7 +70,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<typename Derived::Parent>();
- RETURN_IF_ERROR(RuntimeFilterConsumer::init(state,
p.ignore_data_distribution()));
+ RETURN_IF_ERROR(RuntimeFilterConsumer::init(state,
p.is_serial_operator()));
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
init_runtime_filter_dependency(_filter_dependencies, p.operator_id(),
p.node_id(),
@@ -994,7 +994,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- _scan_dependency, p.ignore_data_distribution());
+ _scan_dependency, p.is_serial_operator());
return Status::OK();
}
@@ -1155,6 +1155,8 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool*
pool, const TPlanNode&
_should_run_serial = true;
}
}
+ OperatorX<LocalStateType>::_is_serial_operator =
+ tnode.__isset.is_serial_operator && tnode.is_serial_operator;
if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 2168ff5cb19..56234d9ea4e 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -375,8 +375,8 @@ public:
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
DataDistribution required_data_distribution() const override {
- if (OperatorX<LocalStateType>::ignore_data_distribution()) {
- // `ignore_data_distribution()` returns true means we ignore the
distribution.
+ if (OperatorX<LocalStateType>::is_serial_operator()) {
+ // `is_serial_operator()` returns true means we ignore the
distribution.
return {ExchangeType::NOOP};
}
return {ExchangeType::BUCKET_HASH_SHUFFLE};
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index ab53f5358c2..f320c8e89cd 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -96,8 +96,6 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- bool require_shuffled_data_distribution() const override { return true; }
-
std::shared_ptr<BasicSharedState> create_shared_state() const override {
return nullptr; }
private:
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 1c08eddc141..0e76867b31f 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -93,7 +93,6 @@ public:
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- bool require_shuffled_data_distribution() const override { return true; }
private:
template <class HashTableContext, bool is_intersected>
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index b07942b9ab1..e20b21a0bf2 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int
operator_id, const TP
:
std::vector<TExpr> {}),
_algorithm(tnode.sort_node.__isset.algorithm ?
tnode.sort_node.algorithm
:
TSortAlgorithm::FULL_SORT),
- _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
+ _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index 8462472dd02..a5a24e37163 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -69,10 +69,10 @@ public:
} else if (_merge_by_exchange) {
// The current sort node is used for the ORDER BY
return {ExchangeType::PASSTHROUGH};
+ } else {
+ return {ExchangeType::NOOP};
}
- return
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
- bool require_shuffled_data_distribution() const override { return
_is_analytic_sort; }
bool require_data_distribution() const override { return _is_colocate; }
size_t get_revocable_mem_size(RuntimeState* state) const;
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index 02a99e183c8..7f801b79c0b 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnod
const DescriptorTbl& descs)
: OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
- _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0)
{}
+ _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0)
{
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(Base::init(tnode, state));
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 2d112ebf2df..200e7de8597 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -63,7 +63,9 @@ public:
using Base = OperatorX<UnionSourceLocalState>;
UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs)
- : Base(pool, tnode, operator_id, descs),
_child_size(tnode.num_children) {};
+ : Base(pool, tnode, operator_id, descs),
_child_size(tnode.num_children) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+ }
~UnionSourceOperatorX() override = default;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index d87113ca80a..ff243186c47 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -36,17 +36,17 @@ std::vector<Dependency*>
LocalExchangeSinkLocalState::dependencies() const {
}
Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int
num_buckets,
- const bool
should_disable_bucket_shuffle,
+ const bool use_global_hash_shuffle,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) +
")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
- _use_global_shuffle = should_disable_bucket_shuffle;
+ _use_global_shuffle = use_global_hash_shuffle;
// For shuffle join, if data distribution has been broken by previous
operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To
be mentioned,
// we should use map shuffle idx to instance idx because all instances
will be
// distributed to all BEs. Otherwise, we should use shuffle idx
directly.
- if (should_disable_bucket_shuffle) {
+ if (use_global_hash_shuffle) {
std::for_each(shuffle_idx_to_instance_idx.begin(),
shuffle_idx_to_instance_idx.end(),
[&](const auto& item) {
DCHECK(item.first != -1);
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 1cd9736d429..09b1f2cc310 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -100,7 +100,7 @@ public:
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
}
- Status init(ExchangeType type, const int num_buckets, const bool
should_disable_bucket_shuffle,
+ Status init(ExchangeType type, const int num_buckets, const bool
use_global_hash_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx)
override;
Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index c0da5c8120c..3c706d50182 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -81,9 +81,6 @@ public:
bool is_source() const override { return true; }
- // If input data distribution is ignored by this fragment, this first
local exchange source in this fragment will re-assign all data.
- bool ignore_data_distribution() const override { return false; }
-
private:
friend class LocalExchangeSourceLocalState;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index f4630f328bb..c9f98db26a9 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -226,7 +226,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
new_block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
}
}
- } else if (_num_senders != _num_sources ||
_ignore_source_data_distribution) {
+ } else if (_num_senders != _num_sources) {
// In this branch, data just should be distributed equally into all
instances.
new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 01b55816ba8..cc33efbb934 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -217,24 +217,21 @@ public:
protected:
ShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
- bool ignore_source_data_distribution, int
free_block_limit)
+ int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_sources,
num_partitions,
- free_block_limit),
-
_ignore_source_data_distribution(ignore_source_data_distribution) {
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, LocalExchangeSinkLocalState&
local_state);
-
- const bool _ignore_source_data_distribution = false;
};
class BucketShuffleExchanger final : public ShuffleExchanger {
ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
BucketShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
- bool ignore_source_data_distribution, int
free_block_limit)
+ int free_block_limit)
: ShuffleExchanger(running_sink_operators, num_sources,
num_partitions,
- ignore_source_data_distribution,
free_block_limit) {}
+ free_block_limit) {}
~BucketShuffleExchanger() override = default;
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
};
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 6e83c7805e4..96da754daa5 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -22,6 +22,7 @@
#include <utility>
#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
namespace doris::pipeline {
@@ -31,7 +32,48 @@ void Pipeline::_init_profile() {
_pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
}
-Status Pipeline::add_operator(OperatorPtr& op) {
+bool Pipeline::need_to_local_exchange(const DataDistribution
target_data_distribution,
+ const int idx) const {
+ // If serial operator exists after `idx`-th operator, we should not
improve parallelism.
+ if (std::any_of(_operators.begin() + idx, _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ return false;
+ }
+ // If all operators are serial and sink is not serial, we should improve
parallelism for sink.
+ if (std::all_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ if (!_sink->is_serial_operator()) {
+ return true;
+ }
+ } else if (std::any_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ // If non-serial operators exist, we should improve parallelism for
those.
+ return true;
+ }
+
+ if (target_data_distribution.distribution_type !=
ExchangeType::BUCKET_HASH_SHUFFLE &&
+ target_data_distribution.distribution_type !=
ExchangeType::HASH_SHUFFLE) {
+ // Always do local exchange if non-hash-partition exchanger is
required.
+ // For example, `PASSTHROUGH` exchanger is always required to
distribute data evenly.
+ return true;
+ } else if (_operators.front()->is_serial_operator()) {
+ DCHECK(std::all_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); }) &&
+ _sink->is_serial_operator())
+ << debug_string();
+ // All operators and sink are serial in this path.
+ return false;
+ } else {
+ return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
+ !(is_hash_exchange(_data_distribution.distribution_type) &&
+ is_hash_exchange(target_data_distribution.distribution_type));
+ }
+}
+
+Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) {
+ if (parallelism > 0 && op->is_serial_operator()) {
+ set_num_tasks(parallelism);
+ }
op->set_parallel_tasks(num_tasks());
_operators.emplace_back(op);
if (op->is_source()) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 8a20ccb631c..619848110d4 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -44,14 +44,16 @@ class Pipeline : public
std::enable_shared_from_this<Pipeline> {
public:
explicit Pipeline(PipelineId pipeline_id, int num_tasks,
- std::weak_ptr<PipelineFragmentContext> context)
- : _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
+ std::weak_ptr<PipelineFragmentContext> context, int
num_tasks_of_parent)
+ : _pipeline_id(pipeline_id),
+ _num_tasks(num_tasks),
+ _num_tasks_of_parent(num_tasks_of_parent) {
_init_profile();
_tasks.resize(_num_tasks, nullptr);
}
// Add operators for pipelineX
- Status add_operator(OperatorPtr& op);
+ Status add_operator(OperatorPtr& op, const int parallelism);
// prepare operators for pipelineX
Status prepare(RuntimeState* state);
@@ -71,28 +73,8 @@ public:
return idx == ExchangeType::HASH_SHUFFLE || idx ==
ExchangeType::BUCKET_HASH_SHUFFLE;
}
- bool need_to_local_exchange(const DataDistribution
target_data_distribution) const {
- if (target_data_distribution.distribution_type !=
ExchangeType::BUCKET_HASH_SHUFFLE &&
- target_data_distribution.distribution_type !=
ExchangeType::HASH_SHUFFLE) {
- return true;
- } else if (_operators.front()->ignore_data_hash_distribution()) {
- if (_data_distribution.distribution_type ==
- target_data_distribution.distribution_type &&
- (_data_distribution.partition_exprs.empty() ||
- target_data_distribution.partition_exprs.empty())) {
- return true;
- }
- return _data_distribution.distribution_type !=
- target_data_distribution.distribution_type &&
- !(is_hash_exchange(_data_distribution.distribution_type) &&
-
is_hash_exchange(target_data_distribution.distribution_type));
- } else {
- return _data_distribution.distribution_type !=
- target_data_distribution.distribution_type &&
- !(is_hash_exchange(_data_distribution.distribution_type) &&
-
is_hash_exchange(target_data_distribution.distribution_type));
- }
- }
+ bool need_to_local_exchange(const DataDistribution
target_data_distribution,
+ const int idx) const;
void init_data_distribution() {
set_data_distribution(_operators.front()->required_data_distribution());
}
@@ -120,11 +102,19 @@ public:
for (auto& op : _operators) {
op->set_parallel_tasks(_num_tasks);
}
+
+#ifndef NDEBUG
+ if (num_tasks > 1 &&
+ std::any_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ DCHECK(false) << debug_string();
+ }
+#endif
}
int num_tasks() const { return _num_tasks; }
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
- std::string debug_string() {
+ std::string debug_string() const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"Pipeline [id: {}, _num_tasks: {}, _num_tasks_created:
{}]", _pipeline_id,
@@ -136,6 +126,8 @@ public:
return fmt::to_string(debug_string_buffer);
}
+ int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+
private:
void _init_profile();
@@ -173,6 +165,8 @@ private:
std::atomic<int> _num_tasks_running = 0;
// Tasks in this pipeline.
std::vector<PipelineTask*> _tasks;
+ // Parallelism of parent pipeline.
+ const int _num_tasks_of_parent;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7f3fa348237..a74f0818cb3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -112,7 +112,6 @@
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
-bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");
PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const int fragment_id,
std::shared_ptr<QueryContext> query_ctx,
@@ -183,9 +182,10 @@ void PipelineFragmentContext::cancel(const Status reason) {
LOG(WARNING) << "PipelineFragmentContext is cancelled due to timeout :
" << debug_string();
}
+ // `ILLEGAL_STATE` means queries this fragment belongs to was not found in
FE (maybe finished)
if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state
: {}",
- this->debug_string());
+ debug_string());
}
_query_ctx->cancel(reason, _fragment_id);
@@ -210,28 +210,20 @@ void PipelineFragmentContext::cancel(const Status reason)
{
}
}
-PipelinePtr PipelineFragmentContext::add_pipeline() {
- // _prepared、_submitted, _canceled should do not add pipeline
- PipelineId id = _next_pipeline_id++;
- auto pipeline = std::make_shared<Pipeline>(
- id, _num_instances,
-
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
- _pipelines.emplace_back(pipeline);
- return pipeline;
-}
-
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx)
{
- // _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
auto pipeline = std::make_shared<Pipeline>(
- id, _num_instances,
-
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
+ id, parent ? std::min(parent->num_tasks(), _num_instances) :
_num_instances,
+
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
+ parent ? parent->num_tasks() : _num_instances);
if (idx >= 0) {
_pipelines.insert(_pipelines.begin() + idx, pipeline);
} else {
_pipelines.emplace_back(pipeline);
}
- parent->set_children(pipeline);
+ if (parent) {
+ parent->set_children(pipeline);
+ }
return pipeline;
}
@@ -249,7 +241,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
SCOPED_TIMER(_prepare_timer);
_build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
_init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
- _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile,
"PlanLocalShuffleTime");
+ _plan_local_exchanger_timer = ADD_TIMER(_runtime_profile,
"PlanLocalLocalExchangerTime");
_build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
_prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile,
"PrepareAllPipelinesTime");
{
@@ -336,14 +328,15 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
}
}
- if (_enable_local_shuffle()) {
- SCOPED_TIMER(_plan_local_shuffle_timer);
+ // 4. Build local exchanger
+ if (_runtime_state->enable_local_shuffle()) {
+ SCOPED_TIMER(_plan_local_exchanger_timer);
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
}
- // 4. Initialize global states in pipelines.
+ // 5. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
SCOPED_TIMER(_prepare_all_pipelines_timer);
pipeline->children().clear();
@@ -352,7 +345,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
{
SCOPED_TIMER(_build_tasks_timer);
- // 5. Build pipeline tasks and initialize local state.
+ // 6. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
}
@@ -380,40 +373,6 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
_fragment_instance_ids[i] = fragment_instance_id;
- std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
- auto init_runtime_state = [&](std::unique_ptr<RuntimeState>&
runtime_state) {
-
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
-
- runtime_state->set_task_execution_context(shared_from_this());
- runtime_state->set_be_number(local_params.backend_num);
-
- if (request.__isset.backend_id) {
- runtime_state->set_backend_id(request.backend_id);
- }
- if (request.__isset.import_label) {
- runtime_state->set_import_label(request.import_label);
- }
- if (request.__isset.db_name) {
- runtime_state->set_db_name(request.db_name);
- }
- if (request.__isset.load_job_id) {
- runtime_state->set_load_job_id(request.load_job_id);
- }
- if (request.__isset.wal_id) {
- runtime_state->set_wal_id(request.wal_id);
- }
-
- runtime_state->set_desc_tbl(_desc_tbl);
-
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
- runtime_state->set_num_per_fragment_instances(request.num_senders);
- runtime_state->resize_op_id_to_local_state(max_operator_id());
- runtime_state->set_max_operator_id(max_operator_id());
-
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
- runtime_state->set_total_load_streams(request.total_load_streams);
- runtime_state->set_num_local_sink(request.num_local_sink);
- DCHECK(runtime_filter_mgr);
- runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
- };
auto filterparams = std::make_unique<RuntimeFilterParamsContext>();
@@ -432,8 +391,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const
doris::TPipelineFrag
filterparams->query_ctx = _query_ctx.get();
}
- // build local_runtime_filter_mgr for each instance
- runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
+ auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
request.query_id, filterparams.get(),
_query_ctx->query_mem_tracker);
filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
@@ -470,7 +428,41 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
request.fragment_id, request.query_options,
_query_ctx->query_globals,
_exec_env, _query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
- init_runtime_state(task_runtime_state);
+ {
+ // Initialize runtime state for this task
+
task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
+
+
task_runtime_state->set_task_execution_context(shared_from_this());
+
task_runtime_state->set_be_number(local_params.backend_num);
+
+ if (request.__isset.backend_id) {
+ task_runtime_state->set_backend_id(request.backend_id);
+ }
+ if (request.__isset.import_label) {
+
task_runtime_state->set_import_label(request.import_label);
+ }
+ if (request.__isset.db_name) {
+ task_runtime_state->set_db_name(request.db_name);
+ }
+ if (request.__isset.load_job_id) {
+
task_runtime_state->set_load_job_id(request.load_job_id);
+ }
+ if (request.__isset.wal_id) {
+ task_runtime_state->set_wal_id(request.wal_id);
+ }
+
+ task_runtime_state->set_desc_tbl(_desc_tbl);
+
task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
+
task_runtime_state->set_num_per_fragment_instances(request.num_senders);
+
task_runtime_state->resize_op_id_to_local_state(max_operator_id());
+ task_runtime_state->set_max_operator_id(max_operator_id());
+
task_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+
task_runtime_state->set_total_load_streams(request.total_load_streams);
+
task_runtime_state->set_num_local_sink(request.num_local_sink);
+ DCHECK(_runtime_filter_states[i]->runtime_filter_mgr);
+ task_runtime_state->set_runtime_filter_mgr(
+ _runtime_filter_states[i]->runtime_filter_mgr);
+ }
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
@@ -502,22 +494,12 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
* Finally, we have two upstream dependencies in Pipeline1
corresponding to JoinProbeOperator1
* and JoinProbeOperator2.
*/
-
- // First, set up the parent profile,task runtime state
-
- auto prepare_and_set_parent_profile = [&](PipelineTask* task, size_t
pip_idx) {
- DCHECK(pipeline_id_to_profile[pip_idx]);
- RETURN_IF_ERROR(
- task->prepare(local_params, request.fragment.output_sink,
_query_ctx.get()));
- return Status::OK();
- };
-
for (auto& _pipeline : _pipelines) {
if (pipeline_id_to_task.contains(_pipeline->id())) {
auto* task = pipeline_id_to_task[_pipeline->id()];
DCHECK(task != nullptr);
- // if this task has upstream dependency, then record them.
+ // If this task has upstream dependency, then inject it into
this task.
if (_dag.find(_pipeline->id()) != _dag.end()) {
auto& deps = _dag[_pipeline->id()];
for (auto& dep : deps) {
@@ -537,7 +519,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const
doris::TPipelineFrag
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
- RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
+ DCHECK(pipeline_id_to_profile[pip_idx]);
+ RETURN_IF_ERROR(task->prepare(local_params,
request.fragment.output_sink,
+ _query_ctx.get()));
}
}
{
@@ -549,6 +533,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const
doris::TPipelineFrag
if (target_size > 1 &&
(_runtime_state->query_options().__isset.parallel_prepare_threshold &&
target_size >
_runtime_state->query_options().parallel_prepare_threshold)) {
+ // If instances parallelism is big enough ( >
parallel_prepare_threshold), we will prepare all tasks by multi-threads
std::vector<Status> prepare_status(target_size);
std::mutex m;
std::condition_variable cv;
@@ -636,8 +621,8 @@ void PipelineFragmentContext::trigger_report_if_necessary()
{
_runtime_state->load_channel_profile()->pretty_print(&ss);
}
- VLOG_FILE << "Query " << print_id(this->get_query_id()) << "
fragment "
- << this->get_fragment_id() << " profile:\n"
+ VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment "
<< get_fragment_id()
+ << " profile:\n"
<< ss.str();
}
auto st = send_report(false);
@@ -716,6 +701,9 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
require_shuffled_data_distribution;
+ if (num_children == 0) {
+ _use_serial_source = op->is_serial_operator();
+ }
// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
@@ -748,9 +736,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
DataDistribution data_distribution, bool* do_local_exchange, int
num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
- const std::map<int, int>& shuffle_idx_to_instance_idx,
- const bool ignore_data_hash_distribution) {
- num_buckets = num_buckets != 0 ? num_buckets : _num_instances;
+ const std::map<int, int>& shuffle_idx_to_instance_idx) {
auto& operators = cur_pipe->operators();
const auto downstream_pipeline_id = cur_pipe->id();
auto local_exchange_id = next_operator_id();
@@ -765,13 +751,12 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
const bool followed_by_shuffled_operator =
operators.size() > idx ?
operators[idx]->followed_by_shuffled_operator()
:
cur_pipe->sink()->followed_by_shuffled_operator();
- const bool should_disable_bucket_shuffle =
+ const bool use_global_hash_shuffle =
bucket_seq_to_instance_idx.empty() &&
shuffle_idx_to_instance_idx.find(-1) ==
shuffle_idx_to_instance_idx.end() &&
- followed_by_shuffled_operator;
+ followed_by_shuffled_operator && !_use_serial_source;
sink.reset(new LocalExchangeSinkOperatorX(
- sink_id, local_exchange_id,
- should_disable_bucket_shuffle ? _total_instances : _num_instances,
+ sink_id, local_exchange_id, use_global_hash_shuffle ?
_total_instances : _num_instances,
data_distribution.partition_exprs, bucket_seq_to_instance_idx));
if (bucket_seq_to_instance_idx.empty() &&
data_distribution.distribution_type ==
ExchangeType::BUCKET_HASH_SHUFFLE) {
@@ -779,8 +764,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink()->init(data_distribution.distribution_type,
num_buckets,
- should_disable_bucket_shuffle,
- shuffle_idx_to_instance_idx));
+ use_global_hash_shuffle,
shuffle_idx_to_instance_idx));
// 2. Create and initialize LocalExchangeSharedState.
std::shared_ptr<LocalExchangeSharedState> shared_state =
@@ -791,7 +775,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
- should_disable_bucket_shuffle ? _total_instances :
_num_instances,
+ use_global_hash_shuffle ? _total_instances : _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
?
_runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
@@ -799,7 +783,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger = BucketShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
_num_instances, num_buckets,
- ignore_data_hash_distribution,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
?
_runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
@@ -929,14 +912,12 @@ Status PipelineFragmentContext::_add_local_exchange(
int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr
cur_pipe,
DataDistribution data_distribution, bool* do_local_exchange, int
num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
- const std::map<int, int>& shuffle_idx_to_instance_idx,
- const bool ignore_data_distribution) {
- DCHECK(_enable_local_shuffle());
- if (_num_instances <= 1) {
+ const std::map<int, int>& shuffle_idx_to_instance_idx) {
+ if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
return Status::OK();
}
- if (!cur_pipe->need_to_local_exchange(data_distribution)) {
+ if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
return Status::OK();
}
*do_local_exchange = true;
@@ -946,7 +927,7 @@ Status PipelineFragmentContext::_add_local_exchange(
auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
RETURN_IF_ERROR(_add_local_exchange_impl(
idx, pool, cur_pipe, new_pip, data_distribution,
do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
ignore_data_distribution));
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
CHECK(total_op_num + 1 == cur_pipe->operators().size() +
new_pip->operators().size())
<< "total_op_num: " << total_op_num
@@ -959,7 +940,7 @@ Status PipelineFragmentContext::_add_local_exchange(
RETURN_IF_ERROR(_add_local_exchange_impl(
new_pip->operators().size(), pool, new_pip,
add_pipeline(new_pip, pip_idx + 2),
DataDistribution(ExchangeType::PASSTHROUGH),
do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
ignore_data_distribution));
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
}
return Status::OK();
}
@@ -985,13 +966,8 @@ Status PipelineFragmentContext::_plan_local_exchange(
// scan node. so here use `_num_instance` to replace the `num_buckets`
to prevent dividing 0
// still keep colocate plan after local shuffle
RETURN_IF_ERROR(_plan_local_exchange(
-
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution() ||
- num_buckets == 0
- ? _num_instances
- : num_buckets,
- pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx,
- shuffle_idx_to_instance_idx,
-
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution()));
+ _use_serial_source || num_buckets == 0 ? _num_instances :
num_buckets, pip_idx,
+ _pipelines[pip_idx], bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx));
}
return Status::OK();
}
@@ -999,8 +975,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
Status PipelineFragmentContext::_plan_local_exchange(
int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx,
- const std::map<int, int>& shuffle_idx_to_instance_idx,
- const bool ignore_data_hash_distribution) {
+ const std::map<int, int>& shuffle_idx_to_instance_idx) {
int idx = 1;
bool do_local_exchange = false;
do {
@@ -1012,8 +987,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, ops[idx]->node_id(),
_runtime_state->obj_pool(), pip,
ops[idx]->required_data_distribution(),
&do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx,
- ignore_data_hash_distribution));
+ bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx));
}
if (do_local_exchange) {
// If local exchange is needed for current operator, we will
split this pipeline to
@@ -1030,8 +1004,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, pip->sink()->node_id(),
_runtime_state->obj_pool(), pip,
pip->sink()->required_data_distribution(), &do_local_exchange,
num_buckets,
- bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
- ignore_data_hash_distribution));
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
}
return Status::OK();
}
@@ -1170,7 +1143,8 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
// 1. create and set the source operator of
multi_cast_data_stream_source for new pipeline
source_op.reset(new MultiCastDataStreamerSourceOperatorX(
i, pool, thrift_sink.multi_cast_stream_sink.sinks[i],
row_desc, source_id));
- RETURN_IF_ERROR(new_pipeline->add_operator(source_op));
+ RETURN_IF_ERROR(new_pipeline->add_operator(
+ source_op, params.__isset.parallel_instances ?
params.parallel_instances : 0));
// 2. create and set sink operator of data stream sender for new
pipeline
DataSinkOperatorPtr sink_op;
@@ -1219,11 +1193,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new OlapScanOperatorX(
pool, tnode, next_operator_id(), descs, _num_instances,
enable_query_cache ? request.fragment.query_cache_param :
TQueryCacheParam {}));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
@@ -1232,56 +1203,41 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_query_ctx->query_mem_tracker->is_group_commit_load = true;
#endif
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(),
descs, _num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case doris::TPlanNodeType::JDBC_SCAN_NODE: {
if (config::enable_java_support) {
op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(),
descs, _num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
} else {
return Status::InternalError(
"Jdbc scan node is disabled, you can change be config
enable_java_support "
"to true and restart be.");
}
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
break;
}
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::ES_SCAN_NODE:
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
int num_senders = find_with_default(request.per_exch_num_senders,
tnode.node_id, 0);
DCHECK_GT(num_senders, 0);
op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(),
descs, num_senders));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (request.__isset.parallel_instances) {
- op->set_ignore_data_distribution();
- cur_pipe->set_num_tasks(request.parallel_instances);
- }
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
@@ -1296,7 +1252,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
auto cache_source_id = next_operator_id();
op.reset(new CacheSourceOperatorX(pool, cache_node_id,
cache_source_id,
request.fragment.query_cache_param));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1331,7 +1288,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_require_bucket_distribution));
op->set_followed_by_shuffled_operator(false);
_require_bucket_distribution = true;
- RETURN_IF_ERROR(new_pipe->add_operator(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
@@ -1340,7 +1298,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution ||
op->require_data_distribution();
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
}
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
@@ -1351,11 +1310,13 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
- RETURN_IF_ERROR(new_pipe->add_operator(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
cur_pipe = new_pipe;
} else {
op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
}
} else {
// create new pipeline to add query cache operator
@@ -1371,10 +1332,12 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
if (enable_query_cache) {
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
- RETURN_IF_ERROR(new_pipe->add_operator(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
cur_pipe = new_pipe;
} else {
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
}
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1422,7 +1385,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
pool, tnode_, next_operator_id(), descs, partition_count);
probe_operator->set_inner_operators(inner_sink_operator,
inner_probe_operator);
op = std::move(probe_operator);
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1446,7 +1410,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1473,7 +1438,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::CROSS_JOIN_NODE: {
op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1496,7 +1462,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(),
descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1524,7 +1491,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else {
op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(),
descs));
}
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1551,7 +1519,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case doris::TPlanNodeType::PARTITION_SORT_NODE: {
op.reset(new PartitionSortSourceOperatorX(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1569,7 +1538,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::ANALYTIC_EVAL_NODE: {
op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1591,58 +1561,62 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
request));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
request));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::TABLE_FUNCTION_NODE: {
op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::EMPTY_SET_NODE: {
op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::DATA_GEN_SCAN_NODE: {
op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- op->set_ignore_data_distribution();
- }
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::META_SCAN_NODE: {
op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::SELECT_NODE: {
op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
default:
@@ -1658,9 +1632,11 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op,
- PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+ PipelinePtr& cur_pipe, int parent_idx, int child_idx,
+ const doris::TPipelineFragmentParams& request) {
op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1754,8 +1730,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
_runtime_state->load_channel_profile()->pretty_print(&ss);
}
- LOG_INFO("Query {} fragment {} profile:\n {}",
print_id(this->_query_id),
- this->_fragment_id, ss.str());
+ LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id),
_fragment_id, ss.str());
}
if (_query_ctx->enable_profile()) {
@@ -1779,7 +1754,6 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId
pipeline_id) {
}
}
std::lock_guard<std::mutex> l(_task_mutex);
- g_pipeline_tasks_count << -1;
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
_close_fragment_instance();
@@ -1849,9 +1823,9 @@ PipelineFragmentContext::collect_realtime_profile() const
{
// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
// has already been prepared.
- if (!this->_prepared) {
+ if (!_prepared) {
std::string msg =
- "Query " + print_id(this->_query_id) + " collecting profile,
but its not prepared";
+ "Query " + print_id(_query_id) + " collecting profile, but its
not prepared";
DCHECK(false) << msg;
LOG_ERROR(msg);
return res;
@@ -1872,9 +1846,9 @@
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
// has already been prepared.
- if (!this->_prepared) {
+ if (!_prepared) {
std::string msg =
- "Query " + print_id(this->_query_id) + " collecting profile,
but its not prepared";
+ "Query " + print_id(_query_id) + " collecting profile, but its
not prepared";
DCHECK(false) << msg;
LOG_ERROR(msg);
return nullptr;
@@ -1882,19 +1856,19 @@
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
- if (runtime_state->runtime_profile() == nullptr) {
+ if (runtime_state == nullptr || runtime_state->runtime_profile()
== nullptr) {
continue;
}
auto tmp_load_channel_profile =
std::make_shared<TRuntimeProfileTree>();
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
-
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+
_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}
auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
-
this->_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
+
_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
return load_channel_profile;
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 822a23c54bd..2e75aeb414e 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -78,9 +78,7 @@ public:
int timeout_second() const { return _timeout; }
- PipelinePtr add_pipeline();
-
- PipelinePtr add_pipeline(PipelinePtr parent, int idx = -1);
+ PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1);
RuntimeState* get_runtime_state() { return _runtime_state.get(); }
@@ -123,7 +121,7 @@ public:
_tasks[j][i]->stop_if_finished();
}
}
- };
+ }
private:
Status _build_pipelines(ObjectPool* pool, const
doris::TPipelineFragmentParams& request,
@@ -142,7 +140,8 @@ private:
Status _build_operators_for_set_operation_node(ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs,
OperatorPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
- int child_idx);
+ int child_idx,
+ const
doris::TPipelineFragmentParams& request);
Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
@@ -154,24 +153,19 @@ private:
const std::map<int, int>&
shuffle_idx_to_instance_idx);
Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>&
bucket_seq_to_instance_idx,
- const std::map<int, int>&
shuffle_idx_to_instance_idx,
- const bool ignore_data_distribution);
+ const std::map<int, int>&
shuffle_idx_to_instance_idx);
void _inherit_pipeline_properties(const DataDistribution&
data_distribution,
PipelinePtr pipe_with_source,
PipelinePtr pipe_with_sink);
Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool*
pool,
PipelinePtr cur_pipe, DataDistribution
data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>&
bucket_seq_to_instance_idx,
- const std::map<int, int>&
shuffle_idx_to_instance_idx,
- const bool ignore_data_distribution);
+ const std::map<int, int>&
shuffle_idx_to_instance_idx);
Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr
cur_pipe,
PipelinePtr new_pip, DataDistribution
data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>&
bucket_seq_to_instance_idx,
- const std::map<int, int>&
shuffle_idx_to_instance_idx,
- const bool ignore_data_hash_distribution);
-
- bool _enable_local_shuffle() const { return
_runtime_state->enable_local_shuffle(); }
+ const std::map<int, int>&
shuffle_idx_to_instance_idx);
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
@@ -206,7 +200,7 @@ private:
RuntimeProfile::Counter* _prepare_timer = nullptr;
RuntimeProfile::Counter* _init_context_timer = nullptr;
RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
- RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
+ RuntimeProfile::Counter* _plan_local_exchanger_timer = nullptr;
RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
RuntimeProfile::Counter* _build_tasks_timer = nullptr;
@@ -228,6 +222,7 @@ private:
int _num_instances = 1;
int _timeout = -1;
+ bool _use_serial_source = false;
OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
@@ -290,7 +285,20 @@ private:
// - _task_runtime_states is at the task level, unique to each task.
std::vector<TUniqueId> _fragment_instance_ids;
- // Local runtime states for each task
+ /**
+ * Local runtime states for each task.
+ *
+ * 2-D matrix:
+ * +-------------------------+------------+-------+
+ * | | Instance 0 | Instance 1 | ... |
+ * +------------+------------+------------+-------+
+ * | Pipeline 0 | task 0-0 | task 0-1 | ... |
+ * +------------+------------+------------+-------+
+ * | Pipeline 1 | task 1-0 | task 1-1 | ... |
+ * +------------+------------+------------+-------+
+ * | ... |
+ * +--------------------------------------+-------+
+ */
std::vector<std::vector<std::unique_ptr<RuntimeState>>>
_task_runtime_states;
std::vector<std::unique_ptr<RuntimeFilterParamsContext>>
_runtime_filter_states;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 92e22c895c4..517871e2fb6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -41,15 +41,10 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type_number.h"
-namespace doris {
-namespace vectorized {
+namespace doris::vectorized {
class Arena;
class BufferReadable;
class BufferWritable;
-} // namespace vectorized
-} // namespace doris
-
-namespace doris::vectorized {
struct RowNumberData {
int64_t count = 0;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 4dca9384d65..446f49c3782 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -488,6 +488,12 @@ public class AggregationNode extends PlanNode {
}
}
+ // If `GroupingExprs` is empty and agg need to finalize, the result must
be output by single instance
+ @Override
+ public boolean isSerialOperator() {
+ return aggInfo.getGroupingExprs().isEmpty() && needsFinalize;
+ }
+
public void setColocate(boolean colocate) {
isColocate = colocate;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index cdbf827aed9..7b5998717a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -296,4 +296,15 @@ public class AnalyticEvalNode extends PlanNode {
return output.toString();
}
+
+ /**
+ * If `partitionExprs` is empty, the result must be output by single
instance.
+ *
+ * For example, for `window (colA order by colB)`,
+ * all data should be input in this node to ensure the global ordering by
colB.
+ */
+ @Override
+ public boolean isSerialOperator() {
+ return partitionExprs.isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
index 57d9ce8742f..a4c4aa42c65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
@@ -116,4 +116,9 @@ public class AssertNumRowsNode extends PlanNode {
public int getNumInstances() {
return 1;
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 9c6ba83408a..ce57a57c377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -90,6 +90,10 @@ public class DataPartition {
return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
}
+ public boolean isTabletSinkShufflePartition() {
+ return type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED;
+ }
+
public TPartitionType getType() {
return type;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
index 867c220d9fe..e262797a4fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
@@ -80,5 +80,4 @@ public class EmptySetNode extends PlanNode {
public int getNumInstances() {
return 1;
}
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 4ada9a82f7c..97d46b109b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -202,4 +202,33 @@ public class ExchangeNode extends PlanNode {
public void setRightChildOfBroadcastHashJoin(boolean value) {
isRightChildOfBroadcastHashJoin = value;
}
+
+ /**
+ * If table `t1` has unique key `k1` and value column `v1`.
+ * Now use plan below to load data into `t1`:
+ * ```
+ * FRAGMENT 0:
+ * Merging Exchange (id = 1)
+ * NL Join (id = 2)
+ * DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED)
+ *
+ * FRAGMENT 1:
+ * Exchange (id = 3)
+ * OlapTableSink (id = 4) ```
+ *
+ * In this plan, `Exchange (id = 1)` needs to do merge sort using column
`k1` and `v1` so parallelism
+ * of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which
also has only 1 instance
+ * because this loading job relies on the global ordering of column `k1`
and `v1`.
+ *
+ * So FRAGMENT 0 should not use serial source.
+ */
+ @Override
+ public boolean isSerialOperator() {
+ return partitionType == TPartitionType.UNPARTITIONED && mergeInfo !=
null;
+ }
+
+ @Override
+ public boolean hasSerialChildren() {
+ return isSerialOperator();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
index 91a3c26e770..5dc81e29d85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
@@ -597,7 +597,6 @@ public abstract class JoinNodeBase extends PlanNode {
this.useSpecificProjections = useSpecificProjections;
}
-
public boolean isUseSpecificProjections() {
return useSpecificProjections;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 30c0a2d0394..e2a7504a98d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -281,4 +281,19 @@ public class NestedLoopJoinNode extends JoinNodeBase {
}
return output.toString();
}
+
+ /**
+ * If joinOp is one of type below:
+ * 1. RIGHT_OUTER_JOIN
+ * 2. RIGHT_ANTI_JOIN
+ * 3. RIGHT_SEMI_JOIN
+ * 4. FULL_OUTER_JOIN
+ *
+ * Probe-side must have full data so join is a serial operator.
+ */
+ @Override
+ public boolean isSerialOperator() {
+ return joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp ==
JoinOperator.RIGHT_ANTI_JOIN
+ || joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp ==
JoinOperator.FULL_OUTER_JOIN;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index ae1d34308a3..0ebd023ed41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -162,6 +162,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
public Optional<NereidsSpecifyInstances<ScanSource>> specifyInstances =
Optional.empty();
public TQueryCacheParam queryCacheParam;
+ private int numBackends = 0;
/**
* C'tor for fragment with specific partition; the output is by default
broadcast.
@@ -502,4 +503,23 @@ public class PlanFragment extends TreeNode<PlanFragment> {
public boolean hasNullAwareLeftAntiJoin() {
return planRoot.isNullAwareLeftAntiJoin();
}
+
+ public boolean useSerialSource(ConnectContext context) {
+ return context != null
+ &&
context.getSessionVariable().isIgnoreStorageDataDistribution()
+ && queryCacheParam == null
+ && !hasNullAwareLeftAntiJoin()
+ // If planRoot is not a serial operator and has serial
children, we can use serial source and improve
+ // parallelism of non-serial operators.
+ && sink instanceof DataStreamSink &&
!planRoot.isSerialOperator()
+ && planRoot.hasSerialChildren();
+ }
+
+ public int getNumBackends() {
+ return numBackends;
+ }
+
+ public void setNumBackends(int numBackends) {
+ this.numBackends = numBackends;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 1e9d5646939..14bd34e93e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.TreeNode;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.normalize.Normalizer;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.PlanStats;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsDeriveResult;
@@ -639,6 +640,7 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
TPlanNode msg = new TPlanNode();
msg.node_id = id.asInt();
msg.setNereidsId(nereidsId);
+ msg.setIsSerialOperator(isSerialOperator() &&
fragment.useSerialSource(ConnectContext.get()));
msg.num_children = children.size();
msg.limit = limit;
for (TupleId tid : tupleIds) {
@@ -1374,4 +1376,16 @@ public abstract class PlanNode extends
TreeNode<PlanNode> implements PlanStats {
return true;
});
}
+
+ // Operators need to be executed serially. (e.g. finalized agg without key)
+ public boolean isSerialOperator() {
+ return false;
+ }
+
+ public boolean hasSerialChildren() {
+ if (children.isEmpty()) {
+ return isSerialOperator();
+ }
+ return children.stream().allMatch(PlanNode::hasSerialChildren);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
index 3c6a88cea08..2bc4e847ac3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
@@ -200,4 +200,10 @@ public class RepeatNode extends PlanNode {
}
return output.toString();
}
+
+ // Determined by its child.
+ @Override
+ public boolean isSerialOperator() {
+ return children.get(0).isSerialOperator();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 50b0f5a0269..ea5d27bb17f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -836,4 +836,11 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
public long getSelectedSplitNum() {
return selectedSplitNum;
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return numScanBackends() <= 0 || getScanRangeNum()
+ <
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() *
numScanBackends()
+ || (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
index 6c6b665b00a..734e9338352 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
@@ -109,4 +109,10 @@ public class SelectNode extends PlanNode {
}
return output.toString();
}
+
+ // Determined by its child.
+ @Override
+ public boolean isSerialOperator() {
+ return children.get(0).isSerialOperator();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index e3c405bcbab..e3eb08c3e75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -389,6 +389,12 @@ public class SortNode extends PlanNode {
return new HashSet<>(result);
}
+ // If it's analytic sort or not merged by a followed exchange node, it
must output the global ordered data.
+ @Override
+ public boolean isSerialOperator() {
+ return !isAnalyticSort && !mergeByexchange;
+ }
+
public void setColocate(boolean colocate) {
isColocate = colocate;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 40982d07e77..ac66ce718ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -42,4 +42,11 @@ public class UnionNode extends SetOperationNode {
protected void toThrift(TPlanNode msg) {
toThrift(msg, TPlanNodeType.UNION_NODE);
}
+
+ // If it is a union without children which means it will output some
constant values, we should use a serial union
+ // to output non-duplicated data.
+ @Override
+ public boolean isSerialOperator() {
+ return children.isEmpty();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b9f90242a29..9829f88cf52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1777,6 +1777,19 @@ public class Coordinator implements CoordInterface {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execHostport,
0, params);
params.instanceExecParams.add(instanceParam);
+
+ // Using serial source means a serial source operator will be
used in this fragment (e.g. data will be
+ // shuffled to only 1 exchange operator) and then splitted by
followed local exchanger
+ int expectedInstanceNum = fragment.getParallelExecNum();
+ boolean useSerialSource = fragment.useSerialSource(context);
+ if (useSerialSource) {
+ for (int j = 1; j < expectedInstanceNum; j++) {
+ params.instanceExecParams.add(new FInstanceExecParam(
+ null, execHostport, 0, params));
+ }
+ params.ignoreDataDistribution = true;
+ params.parallelTasksNum = 1;
+ }
continue;
}
@@ -1806,6 +1819,9 @@ public class Coordinator implements CoordInterface {
if (leftMostNode.getNumInstances() == 1) {
exchangeInstances = 1;
}
+ // Using serial source means a serial source operator will be
used in this fragment (e.g. data will be
+ // shuffled to only 1 exchange operator) and then splitted by
followed local exchanger
+ boolean useSerialSource = fragment.useSerialSource(context);
if (exchangeInstances > 0 &&
fragmentExecParamsMap.get(inputFragmentId)
.instanceExecParams.size() > exchangeInstances) {
// random select some instance
@@ -1823,12 +1839,16 @@ public class Coordinator implements CoordInterface {
hosts.get(index % hosts.size()), 0, params);
params.instanceExecParams.add(instanceParam);
}
+ params.ignoreDataDistribution = useSerialSource;
+ params.parallelTasksNum = useSerialSource ? 1 :
params.instanceExecParams.size();
} else {
for (FInstanceExecParam execParams
:
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execParams.host, 0, params);
params.instanceExecParams.add(instanceParam);
}
+ params.ignoreDataDistribution = useSerialSource;
+ params.parallelTasksNum = useSerialSource ? 1 :
params.instanceExecParams.size();
}
// When group by cardinality is smaller than number of
backend, only some backends always
@@ -1874,12 +1894,8 @@ public class Coordinator implements CoordInterface {
boolean sharedScan = true;
int expectedInstanceNum =
Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
- boolean forceToLocalShuffle = context != null
- &&
context.getSessionVariable().isForceToLocalShuffle()
- && !fragment.hasNullAwareLeftAntiJoin() &&
useNereids;
- boolean ignoreStorageDataDistribution =
(forceToLocalShuffle || (node.isPresent()
- &&
node.get().ignoreStorageDataDistribution(context, addressToBackendID.size())
- && useNereids)) && fragment.queryCacheParam ==
null;
+ boolean ignoreStorageDataDistribution =
node.isPresent()
+ && fragment.useSerialSource(context);
if (node.isPresent() && ignoreStorageDataDistribution)
{
expectedInstanceNum =
Math.max(expectedInstanceNum, 1);
// if have limit and no conjuncts, only need 1
instance to save cpu and
@@ -2726,14 +2742,7 @@ public class Coordinator implements CoordInterface {
* 1. `parallelExecInstanceNum * numBackends` is larger than scan
ranges.
* 2. Use Nereids planner.
*/
- boolean forceToLocalShuffle = context != null
- && context.getSessionVariable().isForceToLocalShuffle() &&
!hasNullAwareLeftAntiJoin && useNereids;
- boolean ignoreStorageDataDistribution = (forceToLocalShuffle ||
(scanNodes.stream()
- .allMatch(node -> node.ignoreStorageDataDistribution(context,
- addressToBackendID.size()))
- &&
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
- return addressScanRange.getValue().size() <
parallelExecInstanceNum;
- }) && useNereids)) && params.fragment.queryCacheParam == null;
+ boolean ignoreStorageDataDistribution = params.fragment != null &&
params.fragment.useSerialSource(context);
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer,
List<TScanRangeParams>>>>> addressScanRange
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 89a18589f35..dee675a30d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4291,7 +4291,7 @@ public class SessionVariable implements Serializable,
Writable {
}
public boolean isIgnoreStorageDataDistribution() {
- return ignoreStorageDataDistribution && enableLocalShuffle;
+ return ignoreStorageDataDistribution && enableLocalShuffle &&
enableNereidsPlanner;
}
public void setIgnoreStorageDataDistribution(boolean
ignoreStorageDataDistribution) {
@@ -4329,7 +4329,7 @@ public class SessionVariable implements Serializable,
Writable {
}
public boolean isForceToLocalShuffle() {
- return enableLocalShuffle && forceToLocalShuffle;
+ return enableLocalShuffle && forceToLocalShuffle &&
enableNereidsPlanner;
}
public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1fcf5aff254..3f557bea711 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1364,6 +1364,7 @@ struct TPlanNode {
49: optional i64 push_down_count
50: optional list<list<Exprs.TExpr>> distribute_expr_lists
+ 51: optional bool is_serial_operator
// projections is final projections, which means projecting into results and
materializing them into the output block.
101: optional list<Exprs.TExpr> projections
102: optional Types.TTupleId output_tuple_id
diff --git
a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
index 2493a7df5de..049cbe0b4d7 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
@@ -177,15 +177,15 @@ suite('complex_insert') {
sql 'insert into t1(id, c1, c2, c3) select id, c1 * 2, c2, c3 from t1'
sql 'sync'
- qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+ qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1,
t3.id'
sql 'insert into t2(id, c1, c2, c3) select id, c1, c2 * 2, c3 from t2'
sql 'sync'
- qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+ qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1,
t3.id'
sql 'insert into t2(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3
from t1 order by id, c1 limit 10) t1, t3'
sql 'sync'
- qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+ qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1,
t3.id'
sql 'drop table if exists agg_have_dup_base'
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
index 997230b1a06..d701ad890d6 100644
--- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
+++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
@@ -45,14 +45,14 @@ suite("local_shuffle") {
insert into test_local_shuffle1 values (1, 1), (2, 2);
insert into test_local_shuffle2 values (2, 2), (3, 3);
- set enable_nereids_distribute_planner=true;
+ // set enable_nereids_distribute_planner=true;
set enable_pipeline_x_engine=true;
set disable_join_reorder=true;
set enable_local_shuffle=true;
set force_to_local_shuffle=true;
"""
- order_qt_read_single_olap_table "select * from test_local_shuffle1"
+ order_qt_read_single_olap_table "select * from test_local_shuffle1 order
by id, id2"
order_qt_broadcast_join """
select *
@@ -96,7 +96,7 @@ suite("local_shuffle") {
) a
right outer join [shuffle]
test_local_shuffle2
- on a.id=test_local_shuffle2.id2
+ on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id,
test_local_shuffle2.id2
"""
order_qt_bucket_shuffle_with_prune_tablets2 """
@@ -109,7 +109,7 @@ suite("local_shuffle") {
from test_local_shuffle1
where id=1
) a
- on a.id=test_local_shuffle2.id2
+ on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id,
test_local_shuffle2.id2
"""
order_qt_bucket_shuffle_with_prune_tablets3 """
@@ -150,11 +150,11 @@ suite("local_shuffle") {
"""
order_qt_fillup_bucket """
- SELECT cast(a.c0 as int), cast(b.c0 as int) FROM
+ SELECT cast(a.c0 as int), cast(b.c0 as int) res FROM
(select * from test_local_shuffle3 where c0 =1)a
RIGHT OUTER JOIN
(select * from test_local_shuffle4)b
- ON a.c0 = b.c0
+ ON a.c0 = b.c0 order by res
"""
multi_sql """
@@ -182,6 +182,6 @@ suite("local_shuffle") {
) a
inner join [shuffle]
test_shuffle_left_with_local_shuffle b
- on a.id2=b.id;
+ on a.id2=b.id order by a.id2;
"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]