Copilot commented on code in PR #61495: URL: https://github.com/apache/doris/pull/61495#discussion_r2954627571
########## fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java: ########## @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.ExprToThriftVisitor; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.thrift.TBucketedAggregationNode; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Bucketed hash aggregation node. + * + * Fuses two-phase aggregation (local + global) into a single BE operator for single-BE deployments. + * Produces a BUCKETED_AGGREGATION_NODE in the Thrift plan, which the BE maps to + * BucketedAggSinkOperatorX / BucketedAggSourceOperatorX. + */ +public class BucketedAggregationNode extends PlanNode { + private final AggregateInfo aggInfo; + private final boolean needsFinalize; + + public BucketedAggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo, + boolean needsFinalize) { + super(id, aggInfo.getOutputTupleId().asList(), "BUCKETED AGGREGATE"); + this.aggInfo = aggInfo; + this.needsFinalize = needsFinalize; + this.children.add(input); + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.BUCKETED_AGGREGATION_NODE; + + List<TExpr> aggregateFunctions = Lists.newArrayList(); + for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) { + aggregateFunctions.add(ExprToThriftVisitor.treeToThrift(e)); + } + + List<TExpr> groupingExprs = Lists.newArrayList(); + if (aggInfo.getGroupingExprs() != null) { + groupingExprs = ExprToThriftVisitor.treesToThrift(aggInfo.getGroupingExprs()); + } + + TBucketedAggregationNode bucketedAggNode = new TBucketedAggregationNode(); + bucketedAggNode.setGroupingExprs(groupingExprs); + bucketedAggNode.setAggregateFunctions(aggregateFunctions); + bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt()); Review Comment: `intermediate_tuple_id` is set to the *output* tuple id. For aggregate functions whose intermediate state differs from the final output type (e.g. AVG, NDV sketches, etc.), using the output tuple descriptor as the intermediate tuple can break `AggFnEvaluator::prepare()` expectations and/or produce incorrect results. Set `intermediate_tuple_id` to the actual intermediate tuple id from `AggregateInfo` (and keep `output_tuple_id` as the final tuple). ########## gensrc/thrift/PlanNodes.thrift: ########## @@ -1074,6 +1075,14 @@ struct TAggregationNode { 10: optional TSortInfo agg_sort_info_by_group_key } +struct TBucketedAggregationNode { + 1: optional list<Exprs.TExpr> grouping_exprs + 2: required list<Exprs.TExpr> aggregate_functions + 3: required Types.TTupleId intermediate_tuple_id + 4: required Types.TTupleId output_tuple_id + 5: required bool need_finalize +} Review Comment: `grouping_exprs` is declared `optional`, but the BE treats “empty grouping_exprs” as an internal error (see `pipeline_fragment_context.cpp`). If this node is never valid without GROUP BY keys, consider making `grouping_exprs` a `required list<Exprs.TExpr>` (and documenting “must be non-empty”) to encode the contract in the IDL and reduce accidental generation of invalid plans. ########## fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java: ########## @@ -2925,6 +2928,32 @@ public void setSkewRewriteAggBucketNum(int num) { checker = "checkAggPhase") public int aggPhase = 0; + @VariableMgr.VarAttr(name = ENABLE_BUCKETED_HASH_AGG, needForward = true, description = { + "是否启用 bucketed hash aggregation 优化。该优化在单 BE 场景下将两阶段聚合融合为单个算子," + + "消除 Exchange 开销和序列化/反序列化成本。默认关闭。", + "Whether to enable bucketed hash aggregation optimization. This optimization fuses two-phase " + + "aggregation into a single operator on single-BE deployments, eliminating exchange overhead " + + "and serialization/deserialization costs. Disabled by default."}) Review Comment: The variable description states “默认关闭 / Disabled by default” but the actual default is `true`, which makes this optimization enabled by default and can change query planning behavior unexpectedly. Either change the default to `false` or update both CN/EN descriptions to match the intended default (and ensure release notes / behavior-change checklist reflects it if enabling by default is intended). ########## be/src/exec/pipeline/pipeline_fragment_context.cpp: ########## @@ -1423,6 +1425,53 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } break; } + case TPlanNodeType::BUCKETED_AGGREGATION_NODE: { + if (tnode.bucketed_agg_node.grouping_exprs.empty()) { + return Status::InternalError( + "Bucketed aggregation node {} should not be used without group by keys", + tnode.node_id); + } + + // Create source operator (goes on the current / downstream pipeline). + op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); Review Comment: This uses `_parallel_instances` when adding the operator to the pipeline, but uses `_num_instances` when creating sink/source dependencies and shared state bookkeeping. If these differ, dependency counts can mismatch the actual number of running source/sink tasks, risking hangs (waiting on deps that will never be signaled) or premature completion. Use the same instance count consistently for operator parallelism and dependency creation (whichever is correct for this fragment/pipeline), and keep it aligned with `RuntimeState::task_num()` used later in `BucketedAggSharedState::init_instances(...)`. ########## be/src/exec/pipeline/pipeline_fragment_context.cpp: ########## @@ -1423,6 +1425,53 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } break; } + case TPlanNodeType::BUCKETED_AGGREGATION_NODE: { + if (tnode.bucketed_agg_node.grouping_exprs.empty()) { + return Status::InternalError( + "Bucketed aggregation node {} should not be used without group by keys", + tnode.node_id); + } + + // Create source operator (goes on the current / downstream pipeline). + op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + + // Create a new pipeline for the sink side. + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + + // Create sink operator. + sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>( + pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back())); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); + + // Pre-register a single shared state for ALL instances so that every + // sink instance writes its per-instance hash table into the same + // BucketedAggSharedState and every source instance can merge across + // all of them. + { + auto shared_state = BucketedAggSharedState::create_shared(); + shared_state->id = op->operator_id(); + shared_state->related_op_ids.insert(op->operator_id()); + + for (int i = 0; i < _num_instances; i++) { + auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(), + "BUCKETED_AGG_SINK_DEPENDENCY"); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + } + shared_state->create_source_dependencies(_num_instances, op->operator_id(), Review Comment: This uses `_parallel_instances` when adding the operator to the pipeline, but uses `_num_instances` when creating sink/source dependencies and shared state bookkeeping. If these differ, dependency counts can mismatch the actual number of running source/sink tasks, risking hangs (waiting on deps that will never be signaled) or premature completion. Use the same instance count consistently for operator parallelism and dependency creation (whichever is correct for this fragment/pipeline), and keep it aligned with `RuntimeState::task_num()` used later in `BucketedAggSharedState::init_instances(...)`. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java: ########## @@ -165,6 +180,221 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr expr, Map<String, Strin aggregate.getLogicalProperties(), localAgg)); } + /** + * Implements bucketed hash aggregation for single-BE deployments. + * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate operator, + * eliminating exchange overhead and serialization/deserialization costs. + * + * Only generated when: + * 1. enable_bucketed_hash_agg session variable is true + * 2. Cluster has exactly one alive BE + * 3. Aggregate has GROUP BY keys (no without-key aggregation) + * 4. Aggregate functions support two-phase execution + * 5. Data volume checks pass (min input rows, max group keys) + */ + private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> aggregate, ConnectContext ctx) { + if (!ctx.getSessionVariable().enableBucketedHashAgg) { + return ImmutableList.of(); + } + // Only for single-BE deployments + int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true)); + if (beNumber != 1) { + return ImmutableList.of(); + } + // Without-key aggregation not supported in initial version + if (aggregate.getGroupByExpressions().isEmpty()) { + return ImmutableList.of(); + } + // Must support two-phase execution (same check as splitTwoPhase) + if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) { + return ImmutableList.of(); + } + // Skip aggregates with no aggregate functions (pure GROUP BY dedup). + // These are produced by DistinctAggregateRewriter as the bottom dedup phase. + if (aggregate.getAggregateFunctions().isEmpty()) { + return ImmutableList.of(); + } + // Skip aggregates whose child group contains a LogicalAggregate. + // This detects the top aggregate in a DISTINCT decomposition (e.g., + // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b + // on top of GROUP BY a,b dedup). Bucketed agg does not support + // DISTINCT aggregation in the initial version. + if (childGroupContainsAggregate(aggregate)) { + return ImmutableList.of(); + } + // Skip when data is already distributed by the GROUP BY keys + // (e.g., table bucketed by UserID, query GROUP BY UserID). + // In this case the two-phase plan needs no exchange and is strictly + // better than bucketed agg (no 256-bucket overhead, no merge phase). + if (groupByKeysSatisfyDistribution(aggregate)) { + return ImmutableList.of(); + } + // Data-volume-based checks: control bucketed agg eligibility based on + // estimated data scale, similar to ClickHouse's group_by_two_level_threshold + // and group_by_two_level_threshold_bytes. This reduces reliance on + // column-level statistics which may be inaccurate or missing. + // + // When statistics are unavailable (groupExpression absent or childStats null), + // conservatively skip bucketed agg — without data volume information we cannot + // make an informed decision, and the risk of choosing bucketed agg in a + // high-cardinality scenario outweighs the potential benefit. + if (!aggregate.getGroupExpression().isPresent()) { + return ImmutableList.of(); + } + GroupExpression ge = aggregate.getGroupExpression().get(); + Statistics childStats = ge.childStatistics(0); + if (childStats == null) { + return ImmutableList.of(); + } + double rows = childStats.getRowCount(); + long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows; + long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys; + + // Gate: minimum input rows. + // When input data is too small, the overhead of initializing 256 + // per-bucket hash tables and the pipelined merge phase outweighs + // the benefit of eliminating exchange. Skip bucketed agg. + if (minInputRows > 0 && rows < minInputRows) { + return ImmutableList.of(); + } + + // Gate: maximum estimated group keys (similar to ClickHouse's + // group_by_two_level_threshold). When the number of distinct groups + // is too large, the source-side merge must combine too many keys + // across instances, and the merge cost dominates. Skip bucketed agg. + Statistics aggStats = ge.getOwnerGroup().getStatistics(); + if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() > maxGroupKeys) { + return ImmutableList.of(); + } + + // High-cardinality ratio checks (existing logic). + // These complement the absolute thresholds above with relative checks: + // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows * threshold, + // the combined NDV is at least that high. + // 2. Aggregation ratio check: if estimated output rows > rows * threshold, + // merge cost dominates. + double highCardThreshold = 0.3; + for (Expression groupByKey : aggregate.getGroupByExpressions()) { + ColumnStatistic colStat = childStats.findColumnStatistics(groupByKey); + if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows * highCardThreshold) { + return ImmutableList.of(); + } + } + if (aggStats != null && aggStats.getRowCount() > rows * highCardThreshold) { + return ImmutableList.of(); + } + // Build output expressions: rewrite AggregateFunction -> AggregateExpression with GLOBAL_RESULT param + // (same as one-phase aggregation — raw input directly produces final result). + List<NamedExpression> aggOutput = ExpressionUtils.rewriteDownShortCircuit( + aggregate.getOutputExpressions(), expr -> { + if (!(expr instanceof AggregateFunction)) { + return expr; + } + return new AggregateExpression((AggregateFunction) expr, AggregateParam.GLOBAL_RESULT); + } + ); + return ImmutableList.of(new PhysicalBucketedHashAggregate<>( + aggregate.getGroupByExpressions(), aggOutput, + aggregate.getLogicalProperties(), aggregate.child())); + } + + /** + * Check if the child group of this aggregate contains a LogicalAggregate. + * This is used to detect aggregates produced by DISTINCT decomposition rewrites + * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where the original + * DISTINCT aggregate is split into a top non-distinct aggregate over a bottom dedup aggregate. + */ + private boolean childGroupContainsAggregate(LogicalAggregate<? extends Plan> aggregate) { + if (!aggregate.getGroupExpression().isPresent()) { + return false; + } + GroupExpression groupExpr = aggregate.getGroupExpression().get(); + if (groupExpr.arity() == 0) { + return false; + } + Group childGroup = groupExpr.child(0); + for (GroupExpression childGroupExpr : childGroup.getLogicalExpressions()) { + if (childGroupExpr.getPlan() instanceof LogicalAggregate) { + return true; + } + } + return false; + } + + /** + * Check if the GROUP BY keys of this aggregate are a superset of (or equal to) + * the underlying OlapTable's hash distribution columns. When this is true, + * the data is already correctly partitioned for the aggregation, so the + * two-phase plan (local + global) requires no exchange and is strictly better + * than bucketed agg (no 256-bucket overhead, no merge phase). + * + * Traverses the child group in the Memo to find a LogicalOlapScan, + * walking through LogicalProject and LogicalFilter transparently. + */ + private boolean groupByKeysSatisfyDistribution(LogicalAggregate<? extends Plan> aggregate) { + if (!aggregate.getGroupExpression().isPresent()) { + return false; + } + GroupExpression groupExpr = aggregate.getGroupExpression().get(); + if (groupExpr.arity() == 0) { + return false; + } + OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5); + if (table == null) { + return false; + } + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return false; + } + List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + if (distributionColumns.isEmpty()) { + return false; + } + // Collect GROUP BY column names (only direct SlotReference with original column info) + Set<String> groupByColumnNames = new HashSet<>(); + for (Expression expr : aggregate.getGroupByExpressions()) { + if (expr instanceof SlotReference) { + SlotReference slot = (SlotReference) expr; + if (slot.getOriginalColumn().isPresent()) { + groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase()); + } + } + } + // All distribution columns must appear in the GROUP BY keys + for (Column column : distributionColumns) { + if (!groupByColumnNames.contains(column.getName().toLowerCase())) { Review Comment: `String#toLowerCase()` without an explicit locale can produce incorrect results under locale-sensitive environments (e.g. Turkish locale). Use `toLowerCase(Locale.ROOT)` (and import `java.util.Locale`) for deterministic case-folding of identifiers. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java: ########## @@ -165,6 +180,221 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr expr, Map<String, Strin aggregate.getLogicalProperties(), localAgg)); } + /** + * Implements bucketed hash aggregation for single-BE deployments. + * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate operator, + * eliminating exchange overhead and serialization/deserialization costs. + * + * Only generated when: + * 1. enable_bucketed_hash_agg session variable is true + * 2. Cluster has exactly one alive BE + * 3. Aggregate has GROUP BY keys (no without-key aggregation) + * 4. Aggregate functions support two-phase execution + * 5. Data volume checks pass (min input rows, max group keys) + */ + private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> aggregate, ConnectContext ctx) { + if (!ctx.getSessionVariable().enableBucketedHashAgg) { + return ImmutableList.of(); + } + // Only for single-BE deployments + int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true)); Review Comment: `Math.max(1, ...)` masks the “0 alive BE” case as `1`, which can incorrectly allow `implementBucketedPhase()` to generate a bucketed-agg candidate when the cluster has no available backends. Use the real backend count directly and require it to be exactly 1. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java: ########## @@ -165,6 +180,221 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr expr, Map<String, Strin aggregate.getLogicalProperties(), localAgg)); } + /** + * Implements bucketed hash aggregation for single-BE deployments. + * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate operator, + * eliminating exchange overhead and serialization/deserialization costs. + * + * Only generated when: + * 1. enable_bucketed_hash_agg session variable is true + * 2. Cluster has exactly one alive BE + * 3. Aggregate has GROUP BY keys (no without-key aggregation) + * 4. Aggregate functions support two-phase execution + * 5. Data volume checks pass (min input rows, max group keys) + */ + private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> aggregate, ConnectContext ctx) { + if (!ctx.getSessionVariable().enableBucketedHashAgg) { + return ImmutableList.of(); + } + // Only for single-BE deployments + int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true)); + if (beNumber != 1) { + return ImmutableList.of(); + } + // Without-key aggregation not supported in initial version + if (aggregate.getGroupByExpressions().isEmpty()) { + return ImmutableList.of(); + } + // Must support two-phase execution (same check as splitTwoPhase) + if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) { + return ImmutableList.of(); + } + // Skip aggregates with no aggregate functions (pure GROUP BY dedup). + // These are produced by DistinctAggregateRewriter as the bottom dedup phase. + if (aggregate.getAggregateFunctions().isEmpty()) { + return ImmutableList.of(); + } + // Skip aggregates whose child group contains a LogicalAggregate. + // This detects the top aggregate in a DISTINCT decomposition (e.g., + // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b + // on top of GROUP BY a,b dedup). Bucketed agg does not support + // DISTINCT aggregation in the initial version. + if (childGroupContainsAggregate(aggregate)) { + return ImmutableList.of(); + } + // Skip when data is already distributed by the GROUP BY keys + // (e.g., table bucketed by UserID, query GROUP BY UserID). + // In this case the two-phase plan needs no exchange and is strictly + // better than bucketed agg (no 256-bucket overhead, no merge phase). + if (groupByKeysSatisfyDistribution(aggregate)) { + return ImmutableList.of(); + } + // Data-volume-based checks: control bucketed agg eligibility based on + // estimated data scale, similar to ClickHouse's group_by_two_level_threshold + // and group_by_two_level_threshold_bytes. This reduces reliance on + // column-level statistics which may be inaccurate or missing. + // + // When statistics are unavailable (groupExpression absent or childStats null), + // conservatively skip bucketed agg — without data volume information we cannot + // make an informed decision, and the risk of choosing bucketed agg in a + // high-cardinality scenario outweighs the potential benefit. + if (!aggregate.getGroupExpression().isPresent()) { + return ImmutableList.of(); + } + GroupExpression ge = aggregate.getGroupExpression().get(); + Statistics childStats = ge.childStatistics(0); + if (childStats == null) { + return ImmutableList.of(); + } + double rows = childStats.getRowCount(); + long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows; + long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys; + + // Gate: minimum input rows. + // When input data is too small, the overhead of initializing 256 + // per-bucket hash tables and the pipelined merge phase outweighs + // the benefit of eliminating exchange. Skip bucketed agg. + if (minInputRows > 0 && rows < minInputRows) { + return ImmutableList.of(); + } + + // Gate: maximum estimated group keys (similar to ClickHouse's + // group_by_two_level_threshold). When the number of distinct groups + // is too large, the source-side merge must combine too many keys + // across instances, and the merge cost dominates. Skip bucketed agg. + Statistics aggStats = ge.getOwnerGroup().getStatistics(); + if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() > maxGroupKeys) { + return ImmutableList.of(); + } + + // High-cardinality ratio checks (existing logic). + // These complement the absolute thresholds above with relative checks: + // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows * threshold, + // the combined NDV is at least that high. + // 2. Aggregation ratio check: if estimated output rows > rows * threshold, + // merge cost dominates. + double highCardThreshold = 0.3; + for (Expression groupByKey : aggregate.getGroupByExpressions()) { + ColumnStatistic colStat = childStats.findColumnStatistics(groupByKey); + if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows * highCardThreshold) { + return ImmutableList.of(); + } + } + if (aggStats != null && aggStats.getRowCount() > rows * highCardThreshold) { + return ImmutableList.of(); + } + // Build output expressions: rewrite AggregateFunction -> AggregateExpression with GLOBAL_RESULT param + // (same as one-phase aggregation — raw input directly produces final result). + List<NamedExpression> aggOutput = ExpressionUtils.rewriteDownShortCircuit( + aggregate.getOutputExpressions(), expr -> { + if (!(expr instanceof AggregateFunction)) { + return expr; + } + return new AggregateExpression((AggregateFunction) expr, AggregateParam.GLOBAL_RESULT); + } + ); + return ImmutableList.of(new PhysicalBucketedHashAggregate<>( + aggregate.getGroupByExpressions(), aggOutput, + aggregate.getLogicalProperties(), aggregate.child())); + } + + /** + * Check if the child group of this aggregate contains a LogicalAggregate. + * This is used to detect aggregates produced by DISTINCT decomposition rewrites + * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where the original + * DISTINCT aggregate is split into a top non-distinct aggregate over a bottom dedup aggregate. + */ + private boolean childGroupContainsAggregate(LogicalAggregate<? extends Plan> aggregate) { + if (!aggregate.getGroupExpression().isPresent()) { + return false; + } + GroupExpression groupExpr = aggregate.getGroupExpression().get(); + if (groupExpr.arity() == 0) { + return false; + } + Group childGroup = groupExpr.child(0); + for (GroupExpression childGroupExpr : childGroup.getLogicalExpressions()) { + if (childGroupExpr.getPlan() instanceof LogicalAggregate) { + return true; + } + } + return false; + } + + /** + * Check if the GROUP BY keys of this aggregate are a superset of (or equal to) + * the underlying OlapTable's hash distribution columns. When this is true, + * the data is already correctly partitioned for the aggregation, so the + * two-phase plan (local + global) requires no exchange and is strictly better + * than bucketed agg (no 256-bucket overhead, no merge phase). + * + * Traverses the child group in the Memo to find a LogicalOlapScan, + * walking through LogicalProject and LogicalFilter transparently. + */ + private boolean groupByKeysSatisfyDistribution(LogicalAggregate<? extends Plan> aggregate) { + if (!aggregate.getGroupExpression().isPresent()) { + return false; + } + GroupExpression groupExpr = aggregate.getGroupExpression().get(); + if (groupExpr.arity() == 0) { + return false; + } + OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5); + if (table == null) { + return false; + } + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return false; + } + List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + if (distributionColumns.isEmpty()) { + return false; + } + // Collect GROUP BY column names (only direct SlotReference with original column info) + Set<String> groupByColumnNames = new HashSet<>(); + for (Expression expr : aggregate.getGroupByExpressions()) { + if (expr instanceof SlotReference) { + SlotReference slot = (SlotReference) expr; + if (slot.getOriginalColumn().isPresent()) { + groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase()); Review Comment: `String#toLowerCase()` without an explicit locale can produce incorrect results under locale-sensitive environments (e.g. Turkish locale). Use `toLowerCase(Locale.ROOT)` (and import `java.util.Locale`) for deterministic case-folding of identifiers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
