morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3412891906


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java:
##########
@@ -109,10 +109,36 @@ public class PlanTranslatorContext {
 
     private final Map<ScanNode, Set<SlotId>> statsUnknownColumnsMap = 
Maps.newHashMap();
 
+    // Per-node "is there a serial operator between me and the pipeline's 
sink" flag.
+    // Mirrors BE's any_of(operators[idx..end], is_serial_operator) check used 
by
+    // _add_local_exchange / need_to_local_exchange to skip LE insertion when 
an ancestor
+    // in the same pipeline is already serial (the whole pipeline runs with 1 
task, so an
+    // extra LE would be a no-op).  Written by AddLocalExchange entry + 
PlanNode.enforceRequire
+    // step 1 (root → leaf during traversal).  Read by PlanNode.enforceRequire 
step 4 (Layer 1
+    // skip) and by child overrides that compute their require.  Reset to 
false at fragment
+    // root and across pipeline boundaries (see shouldResetSerialFlagForChild).
+    private final Map<PlanNodeId, Boolean> serialAncestorInPipelineMap = 
Maps.newHashMap();
+
+    // Per-node "is there a downstream operator that depends on hash 
distribution for
+    // correctness, with HASH/NOOP path connecting it to me" flag.  Mirrors 
BE's
+    // _followed_by_shuffled_operator propagation in 
pipeline_fragment_context.cpp.
+    // Written by PlanNode.enforceRequire step 1b (root → leaf).  Read by 
SetOperationNode
+    // to decide whether to propagate hash requirement to its inputs (only 
when downstream
+    // needs shuffle for correctness, not just for performance like 
StreamingAgg pre-agg).
+    private final Map<PlanNodeId, Boolean> shuffledAncestorMap = 
Maps.newHashMap();
+
+    // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan 
with
+    // ignoreDataDistribution → _parallel_instances=1 in BE). When true, 
serial operators
+    // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out 
(heavy_ops).
     private boolean isTopMaterializeNode = true;
 
     private final Set<SlotId> virtualColumnIds = Sets.newHashSet();
 
+    // Used by AddLocalExchange: tracks whether any serial operator exists

Review Comment:
   **Dead code**: Field `serialAncestorInCurrentPipeline` and its getter/setter 
(L420-424) are never called by any code. The actual serial-ancestor propagation 
uses `serialAncestorInPipelineMap` (the per-PlanNodeId map).
   
   Consider removing this field and its accessors to avoid future maintainers 
confusing the two mechanisms.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java:
##########
@@ -733,7 +737,7 @@ public ScanContext getScanContext() {
     }
 
     @Override
-    public boolean isSerialOperator() {
+    public boolean isSerialNode() {
         ConnectContext context = ConnectContext.get();
         if (context == null) {
             return numScanBackends() <= 0;

Review Comment:
   **Potential regression**: `isSerialNode()` removed 
`fragment.useSerialSource(context)` condition, but 
`Coordinator.assignScanRanges` (legacy path) calls `isSerialNode()` directly to 
decide whether to assign all scan ranges to the first instance.
   
   Old logic: `numScanBackends() <= 0 || fragment.useSerialSource(context)` (OR 
semantics).
   New logic: `numScanBackends() <= 0 || getScanRangeNum() < ... || 
isForceToLocalShuffle()`.
   The `fragment.useSerialSource()` check is gone.
   
   When a fragment is in serial-source (pooling) mode with a multi-backend scan 
and sufficient scan ranges, `isSerialNode()` returns `false`, causing scan 
ranges to be scattered across instances. But the downstream ExchangeNode is 
serial (1 task), expecting data from only one instance — data from non-first 
instances is lost.
   
   **Suggestion**: Either add `|| fragment != null && 
fragment.useSerialSource(context)` back to `isSerialNode()`, or override 
`isSerialOperatorOnBe()` in ScanNode with OR semantics and change Coordinator 
to call `isSerialOperatorOnBe()`. Note: `isSerialOperatorOnBe` uses AND 
(`isSerialNode() && fragment.useSerialSource()`) which cannot express the old 
OR behavior.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java:
##########
@@ -97,7 +98,7 @@ public String getNodeExplainString(String detailPrefix, 
TExplainLevel detailLeve
     }
 
     @Override
-    public boolean isSerialOperator() {
+    public boolean isSerialOperatorOnBe(ConnectContext context) {

Review Comment:
   **Wrong method override**: This overrides 
`isSerialOperatorOnBe(ConnectContext context)` to directly return `false`, 
while every other node overrides `isSerialNode()`.
   
   `BucketedAggregationNode extends AggregationNode`, which overrides 
`isSerialNode()` as `aggInfo.getGroupingExprs().isEmpty() && needsFinalize`. If 
a BucketedAgg without GROUP BY keys is ever introduced (despite the comment 
saying it won't happen), `isSerialNode()` would return `true` but 
`isSerialOperatorOnBe()` would still return `false`, causing inconsistent 
serial semantics.
   
   No bug currently (BucketedAgg always has GROUP BY keys), but the 
inconsistent override pattern is a future bug waiting to happen.
   
   **Suggestion**: Override `isSerialNode()` to return `false` instead, 
matching the comment ("never a serial operator") and following the same pattern 
as all other node types.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/MaterializationNode.java:
##########
@@ -227,7 +231,17 @@ public void setTopMaterializeNode(boolean 
topMaterializeNode) {
     }
 
     @Override
-    public boolean isSerialOperator() {
+    public boolean isSerialNode() {
         return true;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        Pair<PlanNode, LocalExchangeType> enforceResult = enforceRequire(
+                translatorContext, children.get(0), 0, 
LocalExchangeTypeRequire.requirePassthrough());
+        children = new ArrayList<>();

Review Comment:
   **Style inconsistency**: Uses `new ArrayList<>()` while other nodes 
(AggregationNode, HashJoinNode, AssertNumRowsNode, etc.) use 
`Lists.newArrayList()`. Consider using `Lists.newArrayList()` for consistency.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java:
##########
@@ -258,4 +268,155 @@ public boolean isQueryCacheCandidate() {
     public void setQueryCacheCandidate(boolean queryCacheCandidate) {
         this.queryCacheCandidate = queryCacheCandidate;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+
+        ConnectContext connectContext = translatorContext.getConnectContext();
+        SessionVariable sessionVariable = connectContext.getSessionVariable();
+        // PR #62438: when false, non-finalize agg falls back to BE base class.
+        boolean enableLeBeforeAgg = 
sessionVariable.enableLocalExchangeBeforeAgg;
+        boolean hasKeys = !aggInfo.getGroupingExprs().isEmpty();
+
+        // Each branch mirrors the corresponding BE operator's 
required_data_distribution()
+        // check order 1:1. The helper baseClassRequire() expands BE's base 
class behavior.
+        LocalExchangeTypeRequire requireChild;
+        if (canUseDistinctStreamingAgg(sessionVariable)) {
+            // DistinctStreamingAggOperatorX.  Two flavors share this operator 
class:
+            //   - streaming preagg (useStreamingPreagg=true): 
performance-only,
+            //     flag controls
+            //   - non-streaming dedup (useStreamingPreagg=false): 
correctness-required,
+            //     always HASH regardless of flag
+            // Diverges from BE: BE's `!_needs_finalize && 
!enable_local_exchange_before_agg`
+            // early return catches non-streaming dedup too, causing the same 
family of
+            // wrong-result bug as AggSink (DORIS-25413).
+            if (needsFinalize && !hasKeys) {
+                requireChild = LocalExchangeTypeRequire.noRequire();
+            } else if (!needsFinalize && useStreamingPreagg && 
!enableLeBeforeAgg) {
+                requireChild = baseClassRequire(connectContext);
+            } else if (needsFinalize || (hasKeys && !useStreamingPreagg)) {
+                requireChild = AddLocalExchange.isColocated(this)
+                        ? LocalExchangeTypeRequire.requireHash()
+                        : parentRequire.autoRequireHash();
+            } else if 
(sessionVariable.enableDistinctStreamingAggForcePassthrough) {
+                requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            } else {
+                requireChild = baseClassRequire(connectContext);
+            }
+        } else if (useStreamingPreagg) {
+            // StreamingAggOperatorX
+            if (children.get(0) instanceof HashJoinNode
+                    && 
sessionVariable.enableStreamingAggHashJoinForcePassthrough) {
+                requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            } else if (!needsFinalize && !enableLeBeforeAgg) {
+                requireChild = baseClassRequire(connectContext);
+            } else if (!hasKeys) {
+                requireChild = needsFinalize
+                        ? LocalExchangeTypeRequire.noRequire()
+                        : baseClassRequire(connectContext);
+            } else {
+                requireChild = LocalExchangeTypeRequire.requireHash();
+            }
+        } else {
+            // AggSinkOperatorX — covers finalize phase AND non-finalize 
phases (LOCAL
+            // preagg / FIRST_MERGE dedup). Streaming preagg goes through the 
StreamingAgg
+            // branch above, not here.
+            //
+            // Phase semantics for !needsFinalize:
+            //   - FIRST / SECOND (LOCAL phase, !isMerge): performance-only, 
flag controls
+            //   - FIRST_MERGE (correctness-required): always HASH regardless 
of flag
+            //
+            // Diverges from BE here: BE's `!_needs_finalize && 
!enable_local_exchange_before_agg`
+            // early return also catches FIRST_MERGE, dropping the HASH 
requirement and
+            // causing wrong-result (e.g. PASSTHROUGH over serial child breaks 
the
+            // group-by-key invariant — DORIS-25413).
+            if (!hasKeys) {
+                requireChild = needsFinalize
+                        ? LocalExchangeTypeRequire.noRequire()
+                        : baseClassRequire(connectContext);
+            } else if (!needsFinalize && !aggInfo.isMerge() && 
!enableLeBeforeAgg) {
+                // LOCAL phase (FIRST preagg / SECOND distinct local) + user 
opted out
+                // of pre-agg LE → base class decides: serial child → 
PASSTHROUGH
+                // (parallelism), non-serial child → NOOP (no LE).
+                requireChild = baseClassRequire(connectContext);
+            } else if (!needsFinalize || AddLocalExchange.isColocated(this)) {
+                // FIRST_MERGE (correctness) or finalize+colocate → HASH.
+                requireChild = parentRequire.autoRequireHash();
+            } else if (hasPartitionExprs(parentRequire)) {
+                // FE-only heuristic: finalize non-colocate with parent hash 
requirement
+                // → inherit parent's specific hash type.
+                requireChild = parentRequire.autoRequireHash();
+            } else {
+                // FE-only heuristic: finalize non-colocate without parent 
hash → skip
+                // LE (child Exchange already provides hash distribution).
+                requireChild = LocalExchangeTypeRequire.noRequire();
+            }
+        }
+
+        Pair<PlanNode, LocalExchangeType> enforceResult
+                = enforceRequire(translatorContext, children.get(0), 0, 
requireChild);
+        children = Lists.newArrayList(enforceResult.first);
+        return Pair.of(this, enforceResult.second);
+    }
+
+    /** BE base class required_data_distribution: serial child → PASSTHROUGH, 
else → NOOP. */
+    private LocalExchangeTypeRequire baseClassRequire(ConnectContext 
connectContext) {
+        return children.get(0).isSerialOperatorOnBe(connectContext)
+                ? LocalExchangeTypeRequire.requirePassthrough()
+                : LocalExchangeTypeRequire.noRequire();
+    }
+
+    @Override
+    protected List<Expr> getSemanticPartitionExprs() {
+        return aggInfo.getGroupingExprs();
+    }
+
+    @Override
+    protected List<Expr> getLocalExchangeDistributeExprs(int childIndex, 
boolean followedByShuffled) {
+        // Mirror BE AggSinkOperatorX::update_operator / 
StreamingAggOperatorX::update_operator:
+        //   _partition_exprs = (distribute_expr_lists set && 
(followed_by_shuffled || has_distinct))
+        //                      ? distribute_expr_lists[0] : grouping_exprs
+        // The HASH LocalExchange must partition by _partition_exprs so a 
streaming partial preagg
+        // locally collapses same-key rows.  Using child distribution 
(default) for a non-shuffled
+        // chain scatters same-group rows across N instances, leaving 
partial_preagg essentially a
+        // no-op and breaking row-arrival order at downstream merge-finalize 
(e.g. group_concat).
+        List<Expr> childDist = getChildDistributeExprList(childIndex);
+        boolean hasDistinct = aggInfo.getAggregateExprs().stream()
+                .map(FunctionCallExpr::getFnName)
+                .filter(name -> name != null)
+                .map(name -> name.getFunction())
+                .filter(name -> name != null)
+                .anyMatch(name -> name.startsWith("multi_distinct_"));

Review Comment:
   **Fragile string-based detection**: `hasDistinct` uses 
`name.startsWith("multi_distinct_")` to detect multi-distinct aggregate 
functions. This prefix-matching on function names is fragile — if Doris renames 
multi-distinct functions or adds new functions with the same prefix but 
different semantics, this check silently breaks.
   
   The BE equivalent uses structural checks 
(`tnode.agg_node.__isset.aggregate_functions`), which is more robust.
   
   **Suggestion**: Add an API on `AggInfo` (e.g., `hasDistinctAggregates()` or 
`isMultiDistinct()`) that exposes the presence of multi-distinct aggregates 
structurally, rather than having callers infer it from function name strings.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+
+/**
+ * FE-side local exchange planner — inserts {@link LocalExchangeNode} into 
each fragment's
+ * plan tree so that within-fragment data redistribution is decided at 
planning time
+ * instead of at BE pipeline-build time.
+ *
+ * <h3>When this runs</h3>
+ * Invoked from {@code NereidsPlanner.addLocalExchangeAfterDistribute()} right 
after
+ * {@code DistributePlanner} has assigned instances to fragments and before 
the plan is
+ * serialized to BE.  Gated by session variable {@code 
enable_local_shuffle_planner}
+ * (default true) and {@code enable_local_shuffle}; when either is off this 
pass is
+ * skipped entirely and BE falls back to its own {@code _plan_local_exchange}. 
 The two
+ * paths are mutually exclusive: BE consults {@code 
runtime_state.h::plan_local_shuffle()}
+ * to know whether it should plan LE itself.
+ *
+ * <h3>What it changes</h3>
+ * <ul>
+ *   <li>For each fragment with {@code maxPerBeInstances > 1}, walks the plan 
tree
+ *       bottom-up via {@link PlanNode#enforceAndDeriveLocalExchange} and 
inserts
+ *       LocalExchangeNodes where children's output distribution doesn't 
satisfy the
+ *       parent's requirement.</li>
+ *   <li>May wrap the fragment root with an extra PASSTHROUGH LE so the data 
sink
+ *       (DataStreamSink / OlapTableSink) runs with the full instance count 
even when
+ *       the root operator is serial — see {@link 
#addLocalExchangeForFragment}.</li>
+ *   <li>Does NOT modify the fragment sink itself, fragment boundaries, or 
instance
+ *       assignment.</li>
+ * </ul>
+ *
+ * <h3>Per-BE instance semantics</h3>
+ * Skips fragments where every BE has at most 1 instance.  Using a global 
instance count
+ * would insert LE for "2 BEs × 1 instance" cases, which BE's own
+ * {@code _plan_local_exchange} would not — leading to pipeline task-count 
mismatch and
+ * deadlock.  See {@link #addLocalExchange}.
+ *
+ * <h3>Reading order</h3>
+ * Start with {@link PlanNode#enforceRequire} (the recursion engine), then 
individual
+ * {@code enforceAndDeriveLocalExchange} overrides on PlanNode subclasses.
+ */
+public class AddLocalExchange {
+    /** addLocalExchange with distributed plans, skipping single-instance 
fragments.
+     *  BE's _plan_local_exchange checks _num_instances which is the per-BE 
instance count.
+     *  With _num_instances<=1 all pipelines on that BE have 1 task so local 
exchange is a no-op.
+     *  We must use the same per-BE semantics: skip when every BE has at most 
1 instance.
+     *  Using global instanceCount would insert LE for fragments where 2 BEs 
each have 1 instance
+     *  (global=2, per-BE=1), causing pipeline task mismatch and deadlock. */
+    public void addLocalExchange(FragmentIdMapping<DistributedPlan> 
distributedPlans,
+            PlanTranslatorContext context) {
+        for (DistributedPlan plan : distributedPlans.values()) {
+            PipelineDistributedPlan pipePlan = (PipelineDistributedPlan) plan;
+            long maxPerBeInstances = pipePlan.getInstanceJobs().stream()
+                    .collect(java.util.stream.Collectors.groupingBy(
+                            j -> j.getAssignedWorker().id(), 
java.util.stream.Collectors.counting()))
+                    
.values().stream().mapToLong(Long::longValue).max().orElse(0);
+            if (maxPerBeInstances <= 1) {
+                continue;
+            }
+            PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
+            addLocalExchangeForFragment(fragment, context);
+        }
+    }
+
+    private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context) {
+        DataSink sink = fragment.getSink();
+        LocalExchangeTypeRequire require = sink == null
+                ? LocalExchangeTypeRequire.noRequire() : 
sink.getLocalExchangeTypeRequire();
+        PlanNode root = fragment.getPlanRoot();
+        context.setHasSerialAncestorInPipeline(root, false);
+        Pair<PlanNode, LocalExchangeType> output = root
+                .enforceAndDeriveLocalExchange(context, null, require);
+        PlanNode newRoot = output.first;
+        // The fragment data sink (DataStreamSink, OlapTableSink) runs in the 
same pipeline
+        // as the root. If the root will be serial on BE, the sink pipeline 
has 1 task —
+        // only instance 0 sends data, others hang or miss writes.
+        // Insert PASSTHROUGH fan-out so sink runs with _num_instances tasks.
+        // This matches BE-native's default required_data_distribution():
+        //   _child->is_serial_operator() ? PASSTHROUGH : NOOP
+        if (newRoot.isSerialOperatorOnBe(context.getConnectContext())) {
+            newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot,
+                    LocalExchangeType.PASSTHROUGH, null);
+        }
+        if (newRoot != root) {
+            fragment.setPlanRoot(newRoot);
+        }
+    }
+
+    public static boolean isColocated(PlanNode plan) {
+        if (plan instanceof AggregationNode) {
+            return ((AggregationNode) plan).isColocate() && 
isColocated(plan.getChild(0));
+        } else if (plan instanceof OlapScanNode) {
+            return true;
+        } else if (plan instanceof SelectNode) {
+            return isColocated(plan.getChild(0));
+        } else if (plan instanceof HashJoinNode) {
+            return ((HashJoinNode) plan).isColocate()
+                    && (isColocated(plan.getChild(0)) || 
isColocated(plan.getChild(1)));
+        } else if (plan instanceof SetOperationNode) {
+            if (!((SetOperationNode) plan).isColocate()) {
+                return false;
+            }
+            for (PlanNode child : plan.getChildren()) {
+                if (isColocated(child)) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return false;
+        }
+    }
+
+    public static LocalExchangeType 
resolveExchangeType(LocalExchangeTypeRequire require) {

Review Comment:
   **`preferType()` / `resolveExchangeType()` inconsistency**: 
`RequireHash.preferType()` (at LocalExchangeNode L240-241) returns 
`GLOBAL_EXECUTION_HASH_SHUFFLE`, but `resolveExchangeType()` always returns 
`LOCAL_EXECUTION_HASH_SHUFFLE` for `RequireHash`. This means `preferType()` is 
not authoritative — the actual exchange type inserted is controlled by 
`resolveExchangeType()`.
   
   Currently `hasPartitionExprs()` uses `preferType().isHashShuffle()` which 
works correctly (both GLOBAL and LOCAL return true), but the discrepancy could 
confuse future maintainers.
   
   **Suggestion**: Change `RequireHash.preferType()` to return 
`LOCAL_EXECUTION_HASH_SHUFFLE` to align with `resolveExchangeType()`.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1026,261 @@ private String mergeIcebergAccessPathsWithId(
         }
         return StringUtils.join(mergeDisplayAccessPaths, ", ");
     }
+
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            Pair<PlanNode, LocalExchangeType> childOutput
+                    = enforceRequire(translatorContext, children.get(i), i, 
LocalExchangeTypeRequire.noRequire());
+            newChildren.add(childOutput.first);
+        }
+        this.children = newChildren;
+        return Pair.of(this, LocalExchangeType.NOOP);
+    }
+
+    /**
+     * Unified framework method: propagate serial flag → recurse child → 
satisfy check → Layer 1 skip → insert LE.
+     * Replaces the old 
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+     *
+     * <h3>Data flow</h3>
+     * <ul>
+     *   <li><b>serial-ancestor flag</b> ({@link 
PlanTranslatorContext#hasSerialAncestorInPipeline})
+     *       — flows root → leaf during traversal.  Mirrors BE's
+     *       {@code any_of(operators[idx..end], is_serial_operator)} check 
used by
+     *       {@code _add_local_exchange} to skip LE insertion when an ancestor 
in the same
+     *       pipeline is already serial.  Reset at pipeline boundaries via
+     *       {@link #shouldResetSerialFlagForChild}.</li>
+     *   <li><b>shuffle-for-correctness flag</b>
+     *       ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor}) 
— also flows
+     *       root → leaf.  Mirrors BE's {@code 
_followed_by_shuffled_operator}: tells a
+     *       child whether some downstream operator depends on hash 
distribution for
+     *       correctness, so {@code SetOperationNode} can pre-shuffle union 
branches.</li>
+     *   <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} — 
first is the
+     *       (possibly LE-wrapped) child; second is the actual output 
distribution as
+     *       observed by the parent.  Caller's {@code require.satisfy(output)} 
decides
+     *       whether more LE is needed.</li>
+     *   <li><b>parent.require</b> describes the constraint on the child 
output —
+     *       computed inside the parent's {@code 
enforceAndDeriveLocalExchange} per child.</li>
+     * </ul>
+     *
+     * <h3>Invariants</h3>
+     * <ul>
+     *   <li>Every serial → non-serial transition has an LE somewhere between 
them
+     *       (enforced by framework step 3 below, validated by
+     *       {@link 
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>

Review Comment:
   **Missing method referenced in javadoc**: The javadoc references 
`AddLocalExchange#validateNoSerialWithoutLocalExchange` but this method doesn't 
exist in `AddLocalExchange`. The framework cannot enforce the invariant "every 
serial to non-serial transition has an LE somewhere between them" without this 
validation method.
   
   Suggestion: either implement the method (a post-pass validation walk over 
the plan tree after `addLocalExchange` completes), or remove the reference from 
the javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to