924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3259596727


##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -933,4 +1020,229 @@ private String mergeIcebergAccessPathsWithId(
         }
         return StringUtils.join(mergeDisplayAccessPaths, ", ");
     }
+
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            Pair<PlanNode, LocalExchangeType> childOutput
+                    = enforceRequire(translatorContext, children.get(i), i, 
LocalExchangeTypeRequire.noRequire());
+            newChildren.add(childOutput.first);
+        }
+        this.children = newChildren;
+        return Pair.of(this, LocalExchangeType.NOOP);
+    }
+
+    /**
+     * Unified framework method: propagate serial flag → recurse child → 
satisfy check → Layer 1 skip → insert LE.
+     * Replaces the old 
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+     *
+     * <h3>Data flow</h3>
+     * <ul>
+     *   <li><b>serial-ancestor flag</b> ({@link 
PlanTranslatorContext#hasSerialAncestorInPipeline})
+     *       — flows root → leaf during traversal.  Mirrors BE's
+     *       {@code any_of(operators[idx..end], is_serial_operator)} check 
used by
+     *       {@code _add_local_exchange} to skip LE insertion when an ancestor 
in the same
+     *       pipeline is already serial.  Reset at pipeline boundaries via
+     *       {@link #shouldResetSerialFlagForChild}.</li>
+     *   <li><b>shuffle-for-correctness flag</b>
+     *       ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor}) 
— also flows
+     *       root → leaf.  Mirrors BE's {@code 
_followed_by_shuffled_operator}: tells a
+     *       child whether some downstream operator depends on hash 
distribution for
+     *       correctness, so {@code SetOperationNode} can pre-shuffle union 
branches.</li>
+     *   <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} — 
first is the
+     *       (possibly LE-wrapped) child; second is the actual output 
distribution as
+     *       observed by the parent.  Caller's {@code require.satisfy(output)} 
decides
+     *       whether more LE is needed.</li>
+     *   <li><b>parent.require</b> describes the constraint on the child 
output —
+     *       computed inside the parent's {@code 
enforceAndDeriveLocalExchange} per child.</li>
+     * </ul>
+     *
+     * <h3>Invariants</h3>
+     * <ul>
+     *   <li>Every serial → non-serial transition has an LE somewhere between 
them
+     *       (enforced by framework step 3 below, validated by
+     *       {@link 
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>
+     *   <li>{@code LocalExchangeNode} itself is always non-serial — setting 
it serial
+     *       would defeat its purpose of fanning a 1-task pipeline back to N 
tasks.</li>
+     *   <li>For pipeline-breaking parents ({@code 
shouldResetSerialFlagForChild=true}),
+     *       the child starts a fresh pipeline so {@code hasSerialAncestor} is 
reset; the
+     *       node's own {@code isSerialNode()} still composes in for the 
child's view.</li>
+     *   <li>{@code RequireHash} accepts any hash flavour; {@code 
RequireSpecific} demands
+     *       an exact match (with the one PASSTHROUGH/ADAPTIVE_PASSTHROUGH 
compatibility).
+     *       Pick the looser one whenever correctness allows — see
+     *       {@link LocalExchangeNode.LocalExchangeTypeRequire}.</li>
+     * </ul>
+     *
+     * <h3>Layers</h3>
+     * Layer 1 (shouldSkipLE): mirrors BE's need_to_local_exchange — skip when 
this node or
+     * an ancestor in the same pipeline is serial (operators[idx..end] has 
serial → skip).
+     * Layer 2 (require/output): each Node declares require and output in 
enforceAndDeriveLocalExchange.
+     */
+    protected Pair<PlanNode, LocalExchangeType> enforceRequire(
+            PlanTranslatorContext translatorContext, PlanNode child, int 
childIndex,
+            LocalExchangeTypeRequire require) {
+        // 1. Propagate serial-ancestor flag to child.
+        // For pipeline-splitting operators (shouldReset=true, e.g. 
non-streaming AGG):
+        //   Drop inherited serial flag from parent (parent is in a different 
pipeline),
+        //   but keep this node's own serial status (child is in the same 
pipeline as this
+        //   node's sink, e.g. Exchange is in AGG_Sink pipeline).
+        // For non-splitting operators (shouldReset=false, e.g. streaming AGG):
+        //   Inherit parent's serial flag + this node's own.
+        boolean inheritedSerial = shouldResetSerialFlagForChild(childIndex)
+                ? false : translatorContext.hasSerialAncestorInPipeline(this);
+        boolean childHasSerialAncestor = inheritedSerial || isSerialNode();
+        translatorContext.setHasSerialAncestorInPipeline(child, 
childHasSerialAncestor);
+
+        // 1b. Propagate shuffle-for-correctness-ancestor flag to child.
+        // Mirrors BE's _followed_by_shuffled_operator: a downstream operator 
needs hash
+        // distribution for correctness, and the chain to here goes through 
HASH or NOOP
+        // requirements (so the dependency is preserved).
+        //   propagate = ((inheritedShuffled || 
self.requiresShuffleForCorrectness)
+        //                && require is hash)
+        //            || (inheritedShuffled && require is noop/passthrough)
+        boolean inheritedShuffled = 
translatorContext.hasShuffleForCorrectnessAncestor(this);
+        boolean selfOrInheritedShuffled = inheritedShuffled || 
requiresShuffleForCorrectness();
+        boolean requireIsHash = require.preferType().isHashShuffle();
+        boolean requireIsNoop = require.preferType() == 
LocalExchangeNode.LocalExchangeType.NOOP;
+        boolean childShuffledAncestor = (selfOrInheritedShuffled && 
requireIsHash)
+                || (inheritedShuffled && requireIsNoop);
+        translatorContext.setHasShuffleForCorrectnessAncestor(child, 
childShuffledAncestor);
+
+        // 2. Recurse child (Layer 2: child declares its own require/output)
+        Pair<PlanNode, LocalExchangeType> childOutput =
+                child.enforceAndDeriveLocalExchange(translatorContext, this, 
require);
+
+        // Steps 2.5 and 3 both react to a serial child but address different 
concerns:
+        //   - Step 2.5 rewrites the OUTPUT-side view (what we tell 
satisfy/parent about
+        //     the child's actual distribution).  A serial pipeline runs with 
1 task so
+        //     its distribution claim is meaningless — flatten to NOOP so the 
satisfy
+        //     check below doesn't get fooled by a stale "I output 
BUCKET_HASH" claim.
+        //   - Step 3 rewrites the REQUIRE-side decision (what we want from 
the child).
+        //     If we previously asked for nothing (noRequire) but the child 
turns out
+        //     to be serial and we're not, upgrade to requirePassthrough so an 
LE is
+        //     inserted to restore parallelism.
+
+        // 2.5. Serial child override (output side): if child is serial on BE, 
force its
+        //      reported output to NOOP.  Distribution is irrelevant when the 
child runs
+        //      with 1 task; downstream parallelism is restored either by step 
3 (LE
+        //      insertion) or skipped entirely by step 4b (we're also serial).
+        if 
(childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())) 
{
+            childOutput = Pair.of(childOutput.first, LocalExchangeType.NOOP);
+        }
+
+        // 3. Framework-level serial child check (require side, mirrors BE 
base class
+        //    required_data_distribution): if child will be serial on BE but 
this node is
+        //    not serial, the pipeline has a 1-task serial child feeding an 
N-task non-serial
+        //    parent.  Without LE, pipeline splits (AGG/JOIN) create paired 
pipelines with
+        //    mismatched num_tasks → crash.  Upgrade noRequire to 
requirePassthrough so an
+        //    LE is inserted below to restore parallelism.
+        if (require instanceof LocalExchangeNode.NoRequire
+                && 
childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())
+                && 
!isSerialOperatorOnBe(translatorContext.getConnectContext())) {
+            require = LocalExchangeTypeRequire.requirePassthrough();
+        }
+
+        // 4. Satisfy check: child output meets requirement → done
+        if (require.satisfy(childOutput.second)) {
+            return childOutput;
+        }
+
+        // 4. Layer 1: skip LE when serial operator or ancestor in same 
pipeline
+        // Equivalent to BE's need_to_local_exchange: 
any_of(operators[idx..end], is_serial) → skip
+        if (translatorContext.hasSerialAncestorInPipeline(this) || 
isSerialNode()) {
+            return childOutput;

Review Comment:
   Good catch — fixed in 15d92ba024b. Both the Layer 1 skip and the 
serial-ancestor propagation in `enforceRequire` now use 
`isSerialOperatorOnBe(translatorContext.getConnectContext())` instead of the 
raw `isSerialNode()`.
   
   Verified that BE's `OperatorBase` constructs from the Thrift 
`is_serial_operator` flag (which FE writes via `isSerialOperatorOnBe`, not 
`isSerialNode`) — so `Pipeline::need_to_local_exchange`'s 
`op->is_serial_operator()` check returns false when 
`fragment.useSerialSource(ctx)` is false, even if `isSerialNode()` is true. The 
previous code would have over-skipped LocalExchange in exactly the scenarios 
you listed (`ignore_storage_data_distribution=false`, query cache, NAAJ).
   
   Updated `LocalShuffleNodeCoverageTest.testMaterializationNode` and 
`testSetOperationAndAssertNumRowsNode` to reflect the corrected behavior: in 
the fragment-less unit-test path `isSerialOperatorOnBe` returns false (the 
`fragment != null` guard) so the framework no longer skips Layer 1 and inserts 
the required LocalExchange.
   
   Other `isSerialNode()` call sites in PlanNode.java were audited and left 
as-is:
   - `toThrift()` already uses `isSerialOperatorOnBe`
   - `hasSerialChildren()` is a pure node-level tree walk used only for 
fragment-internal heuristics
   - `createLocalExchange()` heavy-op gate is already inside a 
`fragment.useSerialSource(ctx)` branch, so `isSerialNode` and 
`isSerialOperatorOnBe` are equivalent there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to