github-actions[bot] commented on code in PR #65024:
URL: https://github.com/apache/doris/pull/65024#discussion_r3498426514
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java:
##########
@@ -191,4 +193,46 @@ public static boolean
isOrderKeysMatchGroupKeys(List<OrderKey> orderKeys,
}
return true;
}
+
+ /**
+ * Check the basic environmental conditions for bucketed hash aggregation.
+ * This is the shared eligibility gate used by ChildrenPropertiesRegulator
+ * (to allow the one-phase-GLOBAL+distribute pattern), CostModel (for cost
+ * discount), and PhysicalPlanTranslator (for fusion into
BucketedAggregationNode).
+ *
+ * @return true if the session variable is enabled, there is exactly one
alive BE,
+ * no smooth upgrade is in progress, and the aggregate has GROUP
BY keys.
+ */
+ public static boolean isBucketedHashAggEnabled(int groupByExprCount) {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx == null) {
+ return false;
+ }
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return false;
+ }
+ // Must have GROUP BY keys (without-key aggregation not supported)
+ if (groupByExprCount == 0) {
+ return false;
+ }
+ // Correctness gate: single-BE only (cross-BE in-memory merge is
impossible).
+ // Use be_number_for_test first (set by regression tests), fall back
to real cluster count.
+ int beNumber = ctx.getSessionVariable().getBeNumberForTest();
+ if (beNumber < 0) {
+ beNumber = Math.max(1,
ctx.getEnv().getClusterInfo().getBackendsNumber(true));
+ }
+ if (beNumber != 1) {
+ return false;
+ }
+ // Smooth upgrade safety net: old BE processes do not recognize
+ // BUCKETED_AGGREGATION_NODE plan node type
+ SystemInfoService clusterInfo = ctx.getEnv().getClusterInfo();
+ for (Long beId : clusterInfo.getAllBackendByCurrentCluster(true)) {
+ Backend be = clusterInfo.getBackend(beId);
+ if (be != null && be.isSmoothUpgradeSrc()) {
+ return false;
+ }
+ }
Review Comment:
This helper is now used as the real fusion gate, but it dropped the
data-volume gates that used to decide whether bucketed aggregation was allowed.
After this refactor, `bucketedAggMaxGroupKeys` and
`bucketedAggHighCardThreshold` are no longer read by planner code at all, and
`bucketedAggMinInputRows` only affects a cost discount; it does not stop
`shouldUseBucketedFusion` from replacing a selected one-phase aggregate. That
means settings such as `bucketed_agg_max_group_keys=1` or the default
high-cardinality threshold can still produce a `BUCKETED AGGREGATE` plan.
Please move those checks into the translator eligibility path, or remove/change
the variables with compatibility coverage.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3068,9 +2988,125 @@ private PlanFragment connectJoinNode(HashJoinNode
hashJoinNode, PlanFragment lef
return leftFragment;
}
+ /**
+ * Check whether the one-phase GLOBAL hash aggregate can be fused with its
+ * distribute child into a BucketedAggregationNode. This eliminates
exchange
+ * overhead on single-BE deployments by using in-memory per-bucket merging.
+ */
+ private boolean shouldUseBucketedFusion(PhysicalHashAggregate<? extends
Plan> aggregate) {
+ // Shared eligibility: session var, single-BE, GROUP BY, smooth upgrade
+ if
(!AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
{
+ return false;
+ }
+ // Must be one-phase: GLOBAL + INPUT_TO_RESULT
+ if (aggregate.getAggPhase() != AggPhase.GLOBAL
+ || aggregate.getAggMode() != AggMode.INPUT_TO_RESULT) {
+ return false;
+ }
+ // Child must be PhysicalDistribute with hash distribution matching
group keys
+ Plan child = aggregate.child(0);
+ if (!(child instanceof PhysicalDistribute)) {
+ return false;
+ }
+ DistributionSpec distSpec = ((PhysicalDistribute<?>)
child).getDistributionSpec();
+ if (!(distSpec instanceof DistributionSpecHash)) {
+ return false;
+ }
+ List<ExprId> distKeys = ((DistributionSpecHash)
distSpec).getOrderedShuffledColumns();
+ List<ExprId> groupByKeys = aggregate.getGroupByExpressions().stream()
+ .filter(SlotReference.class::isInstance)
+ .map(SlotReference.class::cast)
+ .map(SlotReference::getExprId)
+ .collect(Collectors.toList());
+ return distKeys.equals(groupByKeys);
+ }
+
+ /**
+ * Fuse a one-phase GLOBAL hash aggregate and its PhysicalDistribute child
+ * into a BucketedAggregationNode, skipping the exchange node entirely.
+ * Visits the distribute's child directly to keep everything in one
fragment.
+ */
+ private PlanFragment visitBucketedFusion(
+ PhysicalHashAggregate<? extends Plan> aggregate,
+ PlanTranslatorContext context) {
+ // Visit the distribute's direct child, bypassing the distribute
entirely.
+ // This avoids creating an ExchangeNode that bucketed agg does not
need.
+ Plan distributeChild = aggregate.child(0).child(0);
+ PlanFragment inputPlanFragment = distributeChild.accept(this, context);
+
+ List<Expression> groupByExpressions =
aggregate.getGroupByExpressions();
+ List<NamedExpression> outputExpressions =
aggregate.getOutputExpressions();
+
+ // 1. generate slot reference for each group expression
+ List<SlotReference> groupSlots =
collectGroupBySlots(groupByExpressions, outputExpressions);
+ ArrayList<Expr> execGroupingExpressions =
translateGroupByExprs(groupByExpressions, context);
+
+ // 2. collect agg expressions and generate agg function to slot
reference map
+ List<Slot> aggFunctionOutput = Lists.newArrayList();
+ ArrayList<FunctionCallExpr> execAggregateFunctions =
+ Lists.newArrayListWithCapacity(outputExpressions.size());
+ Set<AggregateExpression> processedAggregateExpressions =
Sets.newIdentityHashSet();
+ for (NamedExpression o : outputExpressions) {
+ if (o.containsType(AggregateExpression.class)) {
+ aggFunctionOutput.add(o.toSlot());
+ o.foreach(c -> {
+ if (c instanceof SessionVarGuardExpr) {
+ SessionVarGuardExpr guardExpr = (SessionVarGuardExpr)
c;
+ if (guardExpr.child() instanceof AggregateExpression) {
+ AggregateExpression ae = (AggregateExpression)
guardExpr.child();
+ if (processedAggregateExpressions.add(ae)) {
+ execAggregateFunctions.add(
+ (FunctionCallExpr)
ExpressionTranslator.translate(guardExpr, context));
+ }
+ }
+ return true;
+ }
+ if (c instanceof AggregateExpression) {
+ AggregateExpression ae = (AggregateExpression) c;
+ if (processedAggregateExpressions.add(ae)) {
+ execAggregateFunctions.add(
+ (FunctionCallExpr)
ExpressionTranslator.translate(ae, context));
+ }
+ return true;
+ }
+ return false;
+ });
+ }
+ }
+
+ // 3. generate output tuple
+ Pair<TupleDescriptor, List<Integer>> tupleAndIds =
+ buildAggOutputTuple(groupSlots, aggFunctionOutput, context);
+ TupleDescriptor outputTupleDesc = tupleAndIds.first;
+ List<Integer> aggFunOutputIds = tupleAndIds.second;
+
+ // Bucketed agg uses AggPhase.FIRST (update semantics): raw input ->
final result.
+ // Not partial — always needsFinalize.
+ AggregateInfo aggInfo = AggregateInfo.create(execGroupingExpressions,
execAggregateFunctions,
+ aggFunOutputIds, false /* isPartial */, outputTupleDesc,
+ AggregateInfo.AggPhase.FIRST);
+
+ BucketedAggregationNode bucketedAggNode = new BucketedAggregationNode(
+ context.nextPlanNodeId(), inputPlanFragment.getPlanRoot(),
aggInfo, true);
+
+ bucketedAggNode.setNereidsId(aggregate.getId());
+ context.getNereidsIdToPlanNodeIdMap().put(aggregate.getId(),
bucketedAggNode.getId());
+
+ // Do NOT set hasColocatePlanNode — bucketed agg uses its own
hash-based
Review Comment:
This fusion changes the executable distribution without changing the
optimizer-visible property. The selected node is still a
`PhysicalHashAggregate`, so
`ChildOutputPropertyDeriver.visitPhysicalHashAggregate` advertises its child's
hash distribution to parents. The old dedicated bucketed node avoided that by
returning `PhysicalProperties.ANY` because the bucketed operator's 256-bucket
hash is not shuffle-compatible. Here we bypass the `PhysicalDistribute` and
install a `BucketedAggregationNode`, so a parent hash join/aggregate can
believe the child is hash-distributed on the group key and skip an exchange
even though the runtime node no longer provides that distribution. Please keep
the bucketed shape visible to property derivation, or skip/add enforcement when
a parent relies on the hash distribution.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3068,9 +2988,125 @@ private PlanFragment connectJoinNode(HashJoinNode
hashJoinNode, PlanFragment lef
return leftFragment;
}
+ /**
+ * Check whether the one-phase GLOBAL hash aggregate can be fused with its
+ * distribute child into a BucketedAggregationNode. This eliminates
exchange
+ * overhead on single-BE deployments by using in-memory per-bucket merging.
Review Comment:
This fusion gate no longer preserves the old `supportAggregatePhase(TWO)`
exclusion. A grouped `GROUP_CONCAT(... ORDER BY ...)` with
`useOnePhaseAggForGroupConcatWithOrder` is a concrete one-phase-only case: it
can produce a `GLOBAL INPUT_TO_RESULT` hash aggregate that reaches this method,
but the bucketed path cannot transmit aggregate ordering metadata. Regular
`AggregationNode` fills `agg_sort_infos`; `TBucketedAggregationNode` has no
such field, so fusing this plan drops the aggregate `ORDER BY` contract. Please
keep one-phase-only/order-sensitive aggregates out of bucketed fusion, or add
equivalent sort-info support to the bucketed node.
##########
regression-test/data/nereids_rules_p0/agg_strategy/bucketed_hash_agg.out:
##########
@@ -0,0 +1,41 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !bucketed_shape --
+PhysicalResultSink
+--PhysicalDistribute[DistributionSpecGather]
+----hashAgg[GLOBAL]
+------PhysicalDistribute[DistributionSpecHash]
+--------PhysicalProject
+----------PhysicalOlapScan[bucketed_agg_reg_test]
+
+-- !bucketed_result --
+a 90
+b 240
+c 220
+
+-- !multi_be_shape --
+PhysicalResultSink
+--PhysicalDistribute[DistributionSpecGather]
+----hashAgg[GLOBAL]
+------PhysicalDistribute[DistributionSpecHash]
+--------hashAgg[LOCAL]
+----------PhysicalProject
+------------PhysicalOlapScan[bucketed_agg_reg_test]
+
+-- !multi_be_result --
+a 90
+b 240
+c 220
+
+-- !disabled_result --
+a 90
+b 240
+c 220
+
+-- !no_group_by_result --
+550
+
+-- !count_distinct_result --
+a 4 200
+b 5 370
+c 4 340
+
Review Comment:
This added blank line at EOF fails the standard whitespace gate (`git diff
--check` reports `new blank line at EOF` for this file). Please regenerate or
trim the `.out` file so the expected output ends at the last result line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]