This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b6cd23052d4 [fix](spill) disable partitioned agg when group by limit
opt is set (#37873)
b6cd23052d4 is described below
commit b6cd23052d4d55daa296f45333938043faa03bef
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Jul 18 10:16:24 2024 +0800
[fix](spill) disable partitioned agg when group by limit opt is set (#37873)
## Proposed changes
Disable partitioned agg when group by limit opt is set(#29641)
---
be/src/pipeline/pipeline_fragment_context.cpp | 18 +++++++++++++-----
1 file changed, 13 insertions(+), 5 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 39555d3614e..1ab40723641 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1182,11 +1182,19 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
return Status::InternalError("Illegal aggregate node " +
std::to_string(tnode.node_id) +
": group by and output is empty");
}
- if (tnode.agg_node.aggregate_functions.empty() &&
!_runtime_state->enable_agg_spill() &&
+
+ const bool group_by_limit_opt =
+ tnode.agg_node.__isset.agg_sort_info_by_group_key &&
tnode.limit > 0;
+
+ /// PartitionedAggSourceOperatorX does not support "group by limit
opt(#29641)" yet.
+ /// If `group_by_limit_opt` is true, then it might not need to spill
at all.
+ const bool enable_spill = _runtime_state->enable_agg_spill() &&
+ !tnode.agg_node.grouping_exprs.empty() &&
!group_by_limit_opt;
+
+ if (tnode.agg_node.aggregate_functions.empty() && !enable_spill &&
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation &&
- !tnode.agg_node.grouping_exprs.empty() &&
- !tnode.agg_node.__isset.agg_sort_info_by_group_key) {
+ !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
op.reset(new DistinctStreamingAggOperatorX(pool,
next_operator_id(), tnode, descs,
_require_bucket_distribution));
_require_bucket_distribution =
@@ -1198,7 +1206,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
} else {
- if (_runtime_state->enable_agg_spill() &&
!tnode.agg_node.grouping_exprs.empty()) {
+ if (enable_spill) {
op.reset(new PartitionedAggSourceOperatorX(pool, tnode,
next_operator_id(), descs));
} else {
op.reset(new AggSourceOperatorX(pool, tnode,
next_operator_id(), descs));
@@ -1213,7 +1221,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- if (_runtime_state->enable_agg_spill() &&
!tnode.agg_node.grouping_exprs.empty()) {
+ if (enable_spill) {
sink.reset(new PartitionedAggSinkOperatorX(pool,
next_sink_operator_id(), tnode,
descs,
_require_bucket_distribution));
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]