This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5f5866cb96951ae20315bcac81a21da714ed47cd Author: 924060929 <[email protected]> AuthorDate: Tue Mar 31 14:33:50 2026 +0800 [fix](local shuffle) StreamingAgg: use child.isSerialOperator() not fragment.useSerialSource() AggregationNode.enforceAndDeriveLocalExchange() for streaming preagg was using fragment.useSerialSource() to decide whether to require PASSTHROUGH from child. This is a fragment-level flag that stays true for the whole fragment even after local exchanges have been inserted below. When plan is AGG → outer_NLJ → inner_NLJ → OlapScan(pooling), the outer NLJ already provides ADAPTIVE_PASSTHROUGH (its probe-side exchange was inserted by forceEnforceChildExchange). FE wrongly inserted an extra PASSTHROUGH between StreamingAgg and outer NLJ — 5 extra profile entries vs BE native. The correct check mirrors BE StreamingAggOperatorX::required_data_distribution(): return _child->is_serial_operator() ? PASSTHROUGH : NOOP In BE, _child is the IMMEDIATE child in the current pipeline. After an APT exchange is inserted for NLJ, the new pipeline has _use_serial_source=false, so StreamingAgg sees the NLJ (not serial) and returns NOOP. Fix: add children.get(0).isSerialOperator() guard so PASSTHROUGH is only required when the direct child is actually serial (e.g. pooling OlapScan). NLJ (INNER/LEFT) returns false → noRequire → no spurious PT inserted. This fixes nested_nlj_pooling_scan and agg_after_nlj_pooling_scan from knownDiff to MATCH (55/55 test cases now match). --- .../main/java/org/apache/doris/planner/AggregationNode.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index fd2f67dfa49..3b1aa65230f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -297,7 +297,15 @@ public class AggregationNode extends PlanNode { if (children.get(0) instanceof HashJoinNode && sessionVariable.enableStreamingAggHashJoinForcePassthrough) { requireChild = LocalExchangeTypeRequire.requirePassthrough(); - } else if (fragment != null && fragment.useSerialSource(connectContext)) { + } else if (fragment != null && fragment.useSerialSource(connectContext) + && children.get(0).isSerialOperator()) { + // Mirrors BE StreamingAggOperatorX::required_data_distribution(): + // return _child->is_serial_operator() ? PASSTHROUGH : NOOP + // Check the IMMEDIATE child's serialness (e.g. pooling OlapScan → serial=true, + // NLJ (INNER/LEFT) → serial=false). Using fragment.useSerialSource() alone + // over-fires when NLJ exchanges have been inserted below — in BE, after an + // APT exchange is inserted, the new pipeline has _use_serial_source=false, + // so StreamingAgg returns NOOP in that pipeline. requireChild = LocalExchangeTypeRequire.requirePassthrough(); } else { requireChild = LocalExchangeTypeRequire.noRequire(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
