morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3414779422


##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1031,263 @@ private String mergeIcebergAccessPathsWithId(
         }
         return StringUtils.join(mergeDisplayAccessPaths, ", ");
     }
+
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            Pair<PlanNode, LocalExchangeType> childOutput
+                    = enforceRequire(translatorContext, children.get(i), i, 
LocalExchangeTypeRequire.noRequire());
+            newChildren.add(childOutput.first);
+        }
+        this.children = newChildren;
+        return Pair.of(this, LocalExchangeType.NOOP);
+    }
+
+    /**
+     * Unified framework method: propagate serial flag → recurse child → 
satisfy check → Layer 1 skip → insert LE.
+     * Replaces the old 
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+     *
+     * <h3>Data flow</h3>
+     * <ul>
+     *   <li><b>serial-ancestor flag</b> ({@link 
PlanTranslatorContext#hasSerialAncestorInPipeline})
+     *       — flows root → leaf during traversal.  Mirrors BE's
+     *       {@code any_of(operators[idx..end], is_serial_operator)} check 
used by
+     *       {@code _add_local_exchange} to skip LE insertion when an ancestor 
in the same
+     *       pipeline is already serial.  Reset at pipeline boundaries via
+     *       {@link #shouldResetSerialFlagForChild}.</li>
+     *   <li><b>shuffle-for-correctness flag</b>
+     *       ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor}) 
— also flows
+     *       root → leaf.  Mirrors BE's {@code 
_followed_by_shuffled_operator}: tells a
+     *       child whether some downstream operator depends on hash 
distribution for
+     *       correctness, so {@code SetOperationNode} can pre-shuffle union 
branches.</li>
+     *   <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} — 
first is the
+     *       (possibly LE-wrapped) child; second is the actual output 
distribution as
+     *       observed by the parent.  Caller's {@code require.satisfy(output)} 
decides
+     *       whether more LE is needed.</li>
+     *   <li><b>parent.require</b> describes the constraint on the child 
output —
+     *       computed inside the parent's {@code 
enforceAndDeriveLocalExchange} per child.</li>
+     * </ul>
+     *
+     * <h3>Invariants</h3>
+     * <ul>
+     *   <li>Where a serial → non-serial transition needs redistribution, 
framework step 3 inserts
+     *       the LE (e.g. a serial source fanned out via PASSTHROUGH).  This 
is not a hard invariant:
+     *       a serial child feeding a parent that requires PASSTHROUGH / 
noRequire (TableFunction,
+     *       NLJ, Agg) is already correct and needs no LE, so it is 
intentionally not validated by a
+     *       post-pass.</li>
+     *   <li>{@code LocalExchangeNode} itself is always non-serial — setting 
it serial
+     *       would defeat its purpose of fanning a 1-task pipeline back to N 
tasks.</li>
+     *   <li>For pipeline-breaking parents ({@code 
shouldResetSerialFlagForChild=true}),
+     *       the child starts a fresh pipeline so {@code hasSerialAncestor} is 
reset; the
+     *       node's own {@code isSerialNode()} still composes in for the 
child's view.</li>
+     *   <li>{@code RequireHash} accepts any hash flavour; {@code 
RequireSpecific} demands
+     *       an exact match (with the one PASSTHROUGH/ADAPTIVE_PASSTHROUGH 
compatibility).
+     *       Pick the looser one whenever correctness allows — see
+     *       {@link LocalExchangeNode.LocalExchangeTypeRequire}.</li>
+     * </ul>
+     *
+     * <h3>Layers</h3>
+     * Layer 1 (shouldSkipLE): mirrors BE's need_to_local_exchange — skip when 
this node or
+     * an ancestor in the same pipeline is serial (operators[idx..end] has 
serial → skip).
+     * Layer 2 (require/output): each Node declares require and output in 
enforceAndDeriveLocalExchange.
+     */
+    protected Pair<PlanNode, LocalExchangeType> enforceRequire(
+            PlanTranslatorContext translatorContext, PlanNode child, int 
childIndex,
+            LocalExchangeTypeRequire require) {
+        // 1. Propagate serial-ancestor flag to child.
+        // For pipeline-splitting operators (shouldReset=true, e.g. 
non-streaming AGG):
+        //   Drop inherited serial flag from parent (parent is in a different 
pipeline),
+        //   but keep this node's own serial status (child is in the same 
pipeline as this
+        //   node's sink, e.g. Exchange is in AGG_Sink pipeline).
+        // For non-splitting operators (shouldReset=false, e.g. streaming AGG):
+        //   Inherit parent's serial flag + this node's own.
+        boolean inheritedSerial = shouldResetSerialFlagForChild(childIndex)
+                ? false : translatorContext.hasSerialAncestorInPipeline(this);
+        // Use isSerialOperatorOnBe (= isSerialNode && 
fragment.useSerialSource) instead of the
+        // raw isSerialNode().  BE's OperatorBase reads the Thrift 
`is_serial_operator` flag —
+        // which is what FE writes via isSerialOperatorOnBe — so when the 
fragment is not in
+        // serial-source mode, BE treats this operator as non-serial 
regardless of isSerialNode.
+        // Using isSerialNode here would set the child's serial-ancestor flag 
wider than BE's
+        // view and over-skip required LocalExchanges downstream.
+        boolean childHasSerialAncestor = inheritedSerial
+                || isSerialOperatorOnBe(translatorContext.getConnectContext());
+        translatorContext.setHasSerialAncestorInPipeline(child, 
childHasSerialAncestor);
+
+        // 1b. Propagate shuffle-for-correctness-ancestor flag to child.
+        // Mirrors BE's _followed_by_shuffled_operator: a downstream operator 
needs hash
+        // distribution for correctness, and the chain to here goes through 
HASH or NOOP
+        // requirements (so the dependency is preserved).
+        //   propagate = ((inheritedShuffled || 
self.requiresShuffleForCorrectness)
+        //                && require is hash)
+        //            || (inheritedShuffled && require is noop/passthrough)
+        boolean inheritedShuffled = 
translatorContext.hasShuffleForCorrectnessAncestor(this);
+        boolean selfOrInheritedShuffled = inheritedShuffled || 
requiresShuffleForCorrectness();
+        boolean requireIsHash = require.preferType().isHashShuffle();
+        boolean requireIsNoop = require.preferType() == 
LocalExchangeNode.LocalExchangeType.NOOP;
+        boolean childShuffledAncestor = (selfOrInheritedShuffled && 
requireIsHash)
+                || (inheritedShuffled && requireIsNoop);
+        translatorContext.setHasShuffleForCorrectnessAncestor(child, 
childShuffledAncestor);
+
+        // 2. Recurse child (Layer 2: child declares its own require/output)
+        Pair<PlanNode, LocalExchangeType> childOutput =
+                child.enforceAndDeriveLocalExchange(translatorContext, this, 
require);
+
+        // Steps 2.5 and 3 both react to a serial child but address different 
concerns:
+        //   - Step 2.5 rewrites the OUTPUT-side view (what we tell 
satisfy/parent about
+        //     the child's actual distribution).  A serial pipeline runs with 
1 task so
+        //     its distribution claim is meaningless — flatten to NOOP so the 
satisfy
+        //     check below doesn't get fooled by a stale "I output 
BUCKET_HASH" claim.
+        //   - Step 3 rewrites the REQUIRE-side decision (what we want from 
the child).
+        //     If we previously asked for nothing (noRequire) but the child 
turns out
+        //     to be serial and we're not, upgrade to requirePassthrough so an 
LE is
+        //     inserted to restore parallelism.
+
+        // 2.5. Serial child override (output side): if child is serial on BE, 
force its
+        //      reported output to NOOP.  Distribution is irrelevant when the 
child runs
+        //      with 1 task; downstream parallelism is restored either by step 
3 (LE
+        //      insertion) or skipped entirely by step 4b (we're also serial).
+        if 
(childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())) 
{
+            childOutput = Pair.of(childOutput.first, LocalExchangeType.NOOP);
+        }
+
+        // 3. Framework-level serial child check (require side, mirrors BE 
base class
+        //    required_data_distribution): if child will be serial on BE but 
this node is
+        //    not serial, the pipeline has a 1-task serial child feeding an 
N-task non-serial
+        //    parent.  Without LE, pipeline splits (AGG/JOIN) create paired 
pipelines with
+        //    mismatched num_tasks → crash.  Upgrade noRequire to 
requirePassthrough so an
+        //    LE is inserted below to restore parallelism.
+        if (require instanceof LocalExchangeNode.NoRequire
+                && 
childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())
+                && 
!isSerialOperatorOnBe(translatorContext.getConnectContext())) {
+            require = LocalExchangeTypeRequire.requirePassthrough();
+        }
+
+        // 4. Satisfy check: child output meets requirement → done
+        if (require.satisfy(childOutput.second)) {
+            return childOutput;
+        }
+
+        // 4. Layer 1: skip LE when serial operator or ancestor in same 
pipeline
+        // Equivalent to BE's need_to_local_exchange: 
any_of(operators[idx..end], is_serial) → skip.
+        // Use isSerialOperatorOnBe (not isSerialNode) because BE's 
Pipeline::need_to_local_exchange
+        // checks op->is_serial_operator() which reads the Thrift flag set 
from isSerialOperatorOnBe;
+        // when fragment.useSerialSource is false, BE treats this node as 
non-serial.
+        if (translatorContext.hasSerialAncestorInPipeline(this)
+                || 
isSerialOperatorOnBe(translatorContext.getConnectContext())) {
+            return childOutput;
+        }
+
+        // 5. Resolve exchange type and create LE node
+        LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(require);
+        List<Expr> distributeExprs = 
getLocalExchangeDistributeExprs(childIndex, selfOrInheritedShuffled);
+        PlanNode leNode = createLocalExchange(translatorContext, 
childOutput.first, preferType, distributeExprs);
+        return Pair.of(leNode, preferType);
+    }
+
+    /**
+     * Create a LocalExchangeNode wrapping child with the given exchange type.
+     * No child-type skip — matches BE's _add_local_exchange which inserts LE 
for any child
+     * type without checking instanceof.
+     *
+     * Handles heavy-ops bottleneck avoidance (mirrors BE 
pipeline_fragment_context.cpp):
+     * when upstream has 1 task (serial source) and exchange is heavy 
(hash/bucket/adaptive),
+     * insert a PASSTHROUGH fan-out first to avoid single-task bottleneck on 
the heavy
+     * exchange sink. Only applies to local-shuffle (pooling scan) fragments.
+     */
+    protected PlanNode createLocalExchange(PlanTranslatorContext 
translatorContext,
+            PlanNode child, LocalExchangeType exchangeType, List<Expr> 
distributeExprs) {
+        if (fragment != null && 
fragment.useSerialSource(translatorContext.getConnectContext())
+                && exchangeType.isHeavyOperation() && child.isSerialNode()) {
+            PlanNode ptNode = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(),
+                    child, LocalExchangeType.PASSTHROUGH, null);
+            return new LocalExchangeNode(translatorContext.nextPlanNodeId(), 
ptNode,
+                    exchangeType, distributeExprs);
+        }
+        return new LocalExchangeNode(translatorContext.nextPlanNodeId(), child,
+                exchangeType, distributeExprs);
+    }
+
+    /**
+     * Whether the child at {@code childIndex} starts a new pipeline context, 
causing
+     * its serial-ancestor flag to be reset to {@code false} rather than 
inherited from this node.
+     * Override to return {@code true} for pipeline-splitting nodes 
(LocalExchangeNode) and nodes
+     * whose children run in an independent pipeline segment (SortNode above 
analytic, etc.).

Review Comment:
   above -> before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to