This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5f5866cb96951ae20315bcac81a21da714ed47cd
Author: 924060929 <[email protected]>
AuthorDate: Tue Mar 31 14:33:50 2026 +0800

    [fix](local shuffle) StreamingAgg: use child.isSerialOperator() not 
fragment.useSerialSource()
    
    AggregationNode.enforceAndDeriveLocalExchange() for streaming preagg was
    using fragment.useSerialSource() to decide whether to require PASSTHROUGH
    from child.  This is a fragment-level flag that stays true for the whole
    fragment even after local exchanges have been inserted below.
    
    When plan is AGG → outer_NLJ → inner_NLJ → OlapScan(pooling), the outer
    NLJ already provides ADAPTIVE_PASSTHROUGH (its probe-side exchange was
    inserted by forceEnforceChildExchange).  FE wrongly inserted an extra
    PASSTHROUGH between StreamingAgg and outer NLJ — 5 extra profile entries
    vs BE native.
    
    The correct check mirrors BE 
StreamingAggOperatorX::required_data_distribution():
      return _child->is_serial_operator() ? PASSTHROUGH : NOOP
    
    In BE, _child is the IMMEDIATE child in the current pipeline.  After an
    APT exchange is inserted for NLJ, the new pipeline has 
_use_serial_source=false,
    so StreamingAgg sees the NLJ (not serial) and returns NOOP.
    
    Fix: add children.get(0).isSerialOperator() guard so PASSTHROUGH is only
    required when the direct child is actually serial (e.g. pooling OlapScan).
    NLJ (INNER/LEFT) returns false → noRequire → no spurious PT inserted.
    
    This fixes nested_nlj_pooling_scan and agg_after_nlj_pooling_scan from
    knownDiff to MATCH (55/55 test cases now match).
---
 .../main/java/org/apache/doris/planner/AggregationNode.java    | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index fd2f67dfa49..3b1aa65230f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -297,7 +297,15 @@ public class AggregationNode extends PlanNode {
             if (children.get(0) instanceof HashJoinNode
                     && 
sessionVariable.enableStreamingAggHashJoinForcePassthrough) {
                 requireChild = LocalExchangeTypeRequire.requirePassthrough();
-            } else if (fragment != null && 
fragment.useSerialSource(connectContext)) {
+            } else if (fragment != null && 
fragment.useSerialSource(connectContext)
+                    && children.get(0).isSerialOperator()) {
+                // Mirrors BE 
StreamingAggOperatorX::required_data_distribution():
+                //   return _child->is_serial_operator() ? PASSTHROUGH : NOOP
+                // Check the IMMEDIATE child's serialness (e.g. pooling 
OlapScan → serial=true,
+                // NLJ (INNER/LEFT) → serial=false).  Using 
fragment.useSerialSource() alone
+                // over-fires when NLJ exchanges have been inserted below — in 
BE, after an
+                // APT exchange is inserted, the new pipeline has 
_use_serial_source=false,
+                // so StreamingAgg returns NOOP in that pipeline.
                 requireChild = LocalExchangeTypeRequire.requirePassthrough();
             } else {
                 requireChild = LocalExchangeTypeRequire.noRequire();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to