This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9bffc246959 [Improvement](pipeline) Use hash shuffle for 1-phase
Agg/Analytic ope rator #34122
9bffc246959 is described below
commit 9bffc246959af69af494b90a6fdf581bea4e5df5
Author: Gabriel <[email protected]>
AuthorDate: Fri Apr 26 13:47:43 2024 +0800
[Improvement](pipeline) Use hash shuffle for 1-phase Agg/Analytic ope rator
#34122
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 +++++++----
be/src/pipeline/exec/aggregation_sink_operator.h | 2 +-
.../pipeline/exec/partitioned_aggregation_sink_operator.cpp | 6 ++++--
be/src/pipeline/exec/partitioned_aggregation_sink_operator.h | 2 +-
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 10 ++++++++--
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 ++
6 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index fd88b0d1521..6c9d27e2a2b 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -616,7 +616,7 @@ void AggSinkLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& p
}
AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs)
+ const DescriptorTbl& descs, bool
require_bucket_distribution)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
@@ -629,9 +629,12 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int
operator_id, const TPla
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts &&
!tnode.conjuncts.empty())),
- _partition_exprs(tnode.__isset.distribute_expr_lists ?
tnode.distribute_expr_lists[0]
- :
std::vector<TExpr> {}),
- _is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
+ _partition_exprs(require_bucket_distribution ?
(tnode.__isset.distribute_expr_lists
+ ?
tnode.distribute_expr_lists[0]
+ :
std::vector<TExpr> {})
+ :
tnode.agg_node.grouping_exprs),
+ _is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate &&
+ require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples) {}
Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index b3ffa19d6db..0c34acfd7df 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,7 +143,7 @@ protected:
class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 78079a0ddf8..7eb09555aa8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -122,9 +122,11 @@ void
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int
operator_id,
const TPlanNode&
tnode,
- const DescriptorTbl&
descs)
+ const DescriptorTbl&
descs,
+ bool
require_bucket_distribution)
: DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id,
tnode.node_id) {
- _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id,
tnode, descs);
+ _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id,
tnode, descs,
+
require_bucket_distribution);
}
Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 1755cd866f2..1233f66b562 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -294,7 +294,7 @@ public:
class PartitionedAggSinkOperatorX : public
DataSinkOperatorX<PartitionedAggSinkLocalState> {
public:
PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
~PartitionedAggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index bf2c255a127..fc0234c6290 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1034,14 +1034,16 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
DataSinkOperatorXPtr sink;
if (_runtime_state->enable_agg_spill() &&
!tnode.agg_node.grouping_exprs.empty()) {
sink.reset(new PartitionedAggSinkOperatorX(pool,
next_sink_operator_id(), tnode,
- descs));
+ descs,
_require_bucket_distribution));
} else {
- sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs));
+ sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs,
+ _require_bucket_distribution));
}
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
}
+ _require_bucket_distribution = true;
break;
}
case TPlanNodeType::HASH_JOIN_NODE: {
@@ -1106,6 +1108,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
}
+ _require_bucket_distribution = true;
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -1211,6 +1214,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
+ _require_bucket_distribution = true;
break;
}
case TPlanNodeType::INTERSECT_NODE: {
@@ -1268,6 +1272,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
print_plan_node_type(tnode.node_type));
}
+ _require_bucket_distribution = true;
+
return Status::OK();
}
// NOLINTEND(readability-function-cognitive-complexity)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 31febc0d8aa..c87f8f4f784 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -239,6 +239,8 @@ private:
// Total instance num running on all BEs
int _total_instances = -1;
+
+ bool _require_bucket_distribution = false;
};
} // namespace pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]