englefly commented on code in PR #65024:
URL: https://github.com/apache/doris/pull/65024#discussion_r3503116492
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3068,9 +2988,125 @@ private PlanFragment connectJoinNode(HashJoinNode
hashJoinNode, PlanFragment lef
return leftFragment;
}
+ /**
+ * Check whether the one-phase GLOBAL hash aggregate can be fused with its
+ * distribute child into a BucketedAggregationNode. This eliminates
exchange
+ * overhead on single-BE deployments by using in-memory per-bucket merging.
+ */
+ private boolean shouldUseBucketedFusion(PhysicalHashAggregate<? extends
Plan> aggregate) {
+ // Shared eligibility: session var, single-BE, GROUP BY, smooth upgrade
+ if
(!AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
{
+ return false;
+ }
+ // Must be one-phase: GLOBAL + INPUT_TO_RESULT
+ if (aggregate.getAggPhase() != AggPhase.GLOBAL
+ || aggregate.getAggMode() != AggMode.INPUT_TO_RESULT) {
+ return false;
+ }
+ // Child must be PhysicalDistribute with hash distribution matching
group keys
+ Plan child = aggregate.child(0);
+ if (!(child instanceof PhysicalDistribute)) {
+ return false;
+ }
+ DistributionSpec distSpec = ((PhysicalDistribute<?>)
child).getDistributionSpec();
+ if (!(distSpec instanceof DistributionSpecHash)) {
+ return false;
+ }
+ List<ExprId> distKeys = ((DistributionSpecHash)
distSpec).getOrderedShuffledColumns();
+ List<ExprId> groupByKeys = aggregate.getGroupByExpressions().stream()
+ .filter(SlotReference.class::isInstance)
+ .map(SlotReference.class::cast)
+ .map(SlotReference::getExprId)
+ .collect(Collectors.toList());
+ return distKeys.equals(groupByKeys);
+ }
+
+ /**
+ * Fuse a one-phase GLOBAL hash aggregate and its PhysicalDistribute child
+ * into a BucketedAggregationNode, skipping the exchange node entirely.
+ * Visits the distribute's child directly to keep everything in one
fragment.
+ */
+ private PlanFragment visitBucketedFusion(
+ PhysicalHashAggregate<? extends Plan> aggregate,
+ PlanTranslatorContext context) {
+ // Visit the distribute's direct child, bypassing the distribute
entirely.
+ // This avoids creating an ExchangeNode that bucketed agg does not
need.
+ Plan distributeChild = aggregate.child(0).child(0);
+ PlanFragment inputPlanFragment = distributeChild.accept(this, context);
+
+ List<Expression> groupByExpressions =
aggregate.getGroupByExpressions();
+ List<NamedExpression> outputExpressions =
aggregate.getOutputExpressions();
+
+ // 1. generate slot reference for each group expression
+ List<SlotReference> groupSlots =
collectGroupBySlots(groupByExpressions, outputExpressions);
+ ArrayList<Expr> execGroupingExpressions =
translateGroupByExprs(groupByExpressions, context);
+
+ // 2. collect agg expressions and generate agg function to slot
reference map
+ List<Slot> aggFunctionOutput = Lists.newArrayList();
+ ArrayList<FunctionCallExpr> execAggregateFunctions =
+ Lists.newArrayListWithCapacity(outputExpressions.size());
+ Set<AggregateExpression> processedAggregateExpressions =
Sets.newIdentityHashSet();
+ for (NamedExpression o : outputExpressions) {
+ if (o.containsType(AggregateExpression.class)) {
+ aggFunctionOutput.add(o.toSlot());
+ o.foreach(c -> {
+ if (c instanceof SessionVarGuardExpr) {
+ SessionVarGuardExpr guardExpr = (SessionVarGuardExpr)
c;
+ if (guardExpr.child() instanceof AggregateExpression) {
+ AggregateExpression ae = (AggregateExpression)
guardExpr.child();
+ if (processedAggregateExpressions.add(ae)) {
+ execAggregateFunctions.add(
+ (FunctionCallExpr)
ExpressionTranslator.translate(guardExpr, context));
+ }
+ }
+ return true;
+ }
+ if (c instanceof AggregateExpression) {
+ AggregateExpression ae = (AggregateExpression) c;
+ if (processedAggregateExpressions.add(ae)) {
+ execAggregateFunctions.add(
+ (FunctionCallExpr)
ExpressionTranslator.translate(ae, context));
+ }
+ return true;
+ }
+ return false;
+ });
+ }
+ }
+
+ // 3. generate output tuple
+ Pair<TupleDescriptor, List<Integer>> tupleAndIds =
+ buildAggOutputTuple(groupSlots, aggFunctionOutput, context);
+ TupleDescriptor outputTupleDesc = tupleAndIds.first;
+ List<Integer> aggFunOutputIds = tupleAndIds.second;
+
+ // Bucketed agg uses AggPhase.FIRST (update semantics): raw input ->
final result.
+ // Not partial — always needsFinalize.
+ AggregateInfo aggInfo = AggregateInfo.create(execGroupingExpressions,
execAggregateFunctions,
+ aggFunOutputIds, false /* isPartial */, outputTupleDesc,
+ AggregateInfo.AggPhase.FIRST);
+
+ BucketedAggregationNode bucketedAggNode = new BucketedAggregationNode(
+ context.nextPlanNodeId(), inputPlanFragment.getPlanRoot(),
aggInfo, true);
+
+ bucketedAggNode.setNereidsId(aggregate.getId());
+ context.getNereidsIdToPlanNodeIdMap().put(aggregate.getId(),
bucketedAggNode.getId());
+
+ // Do NOT set hasColocatePlanNode — bucketed agg uses its own
hash-based
Review Comment:
review1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]