924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3259596727
##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -933,4 +1020,229 @@ private String mergeIcebergAccessPathsWithId(
}
return StringUtils.join(mergeDisplayAccessPaths, ", ");
}
+
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+ ArrayList<PlanNode> newChildren = Lists.newArrayList();
+ for (int i = 0; i < children.size(); i++) {
+ Pair<PlanNode, LocalExchangeType> childOutput
+ = enforceRequire(translatorContext, children.get(i), i,
LocalExchangeTypeRequire.noRequire());
+ newChildren.add(childOutput.first);
+ }
+ this.children = newChildren;
+ return Pair.of(this, LocalExchangeType.NOOP);
+ }
+
+ /**
+ * Unified framework method: propagate serial flag → recurse child →
satisfy check → Layer 1 skip → insert LE.
+ * Replaces the old
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+ *
+ * <h3>Data flow</h3>
+ * <ul>
+ * <li><b>serial-ancestor flag</b> ({@link
PlanTranslatorContext#hasSerialAncestorInPipeline})
+ * — flows root → leaf during traversal. Mirrors BE's
+ * {@code any_of(operators[idx..end], is_serial_operator)} check
used by
+ * {@code _add_local_exchange} to skip LE insertion when an ancestor
in the same
+ * pipeline is already serial. Reset at pipeline boundaries via
+ * {@link #shouldResetSerialFlagForChild}.</li>
+ * <li><b>shuffle-for-correctness flag</b>
+ * ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor})
— also flows
+ * root → leaf. Mirrors BE's {@code
_followed_by_shuffled_operator}: tells a
+ * child whether some downstream operator depends on hash
distribution for
+ * correctness, so {@code SetOperationNode} can pre-shuffle union
branches.</li>
+ * <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} —
first is the
+ * (possibly LE-wrapped) child; second is the actual output
distribution as
+ * observed by the parent. Caller's {@code require.satisfy(output)}
decides
+ * whether more LE is needed.</li>
+ * <li><b>parent.require</b> describes the constraint on the child
output —
+ * computed inside the parent's {@code
enforceAndDeriveLocalExchange} per child.</li>
+ * </ul>
+ *
+ * <h3>Invariants</h3>
+ * <ul>
+ * <li>Every serial → non-serial transition has an LE somewhere between
them
+ * (enforced by framework step 3 below, validated by
+ * {@link
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>
+ * <li>{@code LocalExchangeNode} itself is always non-serial — setting
it serial
+ * would defeat its purpose of fanning a 1-task pipeline back to N
tasks.</li>
+ * <li>For pipeline-breaking parents ({@code
shouldResetSerialFlagForChild=true}),
+ * the child starts a fresh pipeline so {@code hasSerialAncestor} is
reset; the
+ * node's own {@code isSerialNode()} still composes in for the
child's view.</li>
+ * <li>{@code RequireHash} accepts any hash flavour; {@code
RequireSpecific} demands
+ * an exact match (with the one PASSTHROUGH/ADAPTIVE_PASSTHROUGH
compatibility).
+ * Pick the looser one whenever correctness allows — see
+ * {@link LocalExchangeNode.LocalExchangeTypeRequire}.</li>
+ * </ul>
+ *
+ * <h3>Layers</h3>
+ * Layer 1 (shouldSkipLE): mirrors BE's need_to_local_exchange — skip when
this node or
+ * an ancestor in the same pipeline is serial (operators[idx..end] has
serial → skip).
+ * Layer 2 (require/output): each Node declares require and output in
enforceAndDeriveLocalExchange.
+ */
+ protected Pair<PlanNode, LocalExchangeType> enforceRequire(
+ PlanTranslatorContext translatorContext, PlanNode child, int
childIndex,
+ LocalExchangeTypeRequire require) {
+ // 1. Propagate serial-ancestor flag to child.
+ // For pipeline-splitting operators (shouldReset=true, e.g.
non-streaming AGG):
+ // Drop inherited serial flag from parent (parent is in a different
pipeline),
+ // but keep this node's own serial status (child is in the same
pipeline as this
+ // node's sink, e.g. Exchange is in AGG_Sink pipeline).
+ // For non-splitting operators (shouldReset=false, e.g. streaming AGG):
+ // Inherit parent's serial flag + this node's own.
+ boolean inheritedSerial = shouldResetSerialFlagForChild(childIndex)
+ ? false : translatorContext.hasSerialAncestorInPipeline(this);
+ boolean childHasSerialAncestor = inheritedSerial || isSerialNode();
+ translatorContext.setHasSerialAncestorInPipeline(child,
childHasSerialAncestor);
+
+ // 1b. Propagate shuffle-for-correctness-ancestor flag to child.
+ // Mirrors BE's _followed_by_shuffled_operator: a downstream operator
needs hash
+ // distribution for correctness, and the chain to here goes through
HASH or NOOP
+ // requirements (so the dependency is preserved).
+ // propagate = ((inheritedShuffled ||
self.requiresShuffleForCorrectness)
+ // && require is hash)
+ // || (inheritedShuffled && require is noop/passthrough)
+ boolean inheritedShuffled =
translatorContext.hasShuffleForCorrectnessAncestor(this);
+ boolean selfOrInheritedShuffled = inheritedShuffled ||
requiresShuffleForCorrectness();
+ boolean requireIsHash = require.preferType().isHashShuffle();
+ boolean requireIsNoop = require.preferType() ==
LocalExchangeNode.LocalExchangeType.NOOP;
+ boolean childShuffledAncestor = (selfOrInheritedShuffled &&
requireIsHash)
+ || (inheritedShuffled && requireIsNoop);
+ translatorContext.setHasShuffleForCorrectnessAncestor(child,
childShuffledAncestor);
+
+ // 2. Recurse child (Layer 2: child declares its own require/output)
+ Pair<PlanNode, LocalExchangeType> childOutput =
+ child.enforceAndDeriveLocalExchange(translatorContext, this,
require);
+
+ // Steps 2.5 and 3 both react to a serial child but address different
concerns:
+ // - Step 2.5 rewrites the OUTPUT-side view (what we tell
satisfy/parent about
+ // the child's actual distribution). A serial pipeline runs with
1 task so
+ // its distribution claim is meaningless — flatten to NOOP so the
satisfy
+ // check below doesn't get fooled by a stale "I output
BUCKET_HASH" claim.
+ // - Step 3 rewrites the REQUIRE-side decision (what we want from
the child).
+ // If we previously asked for nothing (noRequire) but the child
turns out
+ // to be serial and we're not, upgrade to requirePassthrough so an
LE is
+ // inserted to restore parallelism.
+
+ // 2.5. Serial child override (output side): if child is serial on BE,
force its
+ // reported output to NOOP. Distribution is irrelevant when the
child runs
+ // with 1 task; downstream parallelism is restored either by step
3 (LE
+ // insertion) or skipped entirely by step 4b (we're also serial).
+ if
(childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext()))
{
+ childOutput = Pair.of(childOutput.first, LocalExchangeType.NOOP);
+ }
+
+ // 3. Framework-level serial child check (require side, mirrors BE
base class
+ // required_data_distribution): if child will be serial on BE but
this node is
+ // not serial, the pipeline has a 1-task serial child feeding an
N-task non-serial
+ // parent. Without LE, pipeline splits (AGG/JOIN) create paired
pipelines with
+ // mismatched num_tasks → crash. Upgrade noRequire to
requirePassthrough so an
+ // LE is inserted below to restore parallelism.
+ if (require instanceof LocalExchangeNode.NoRequire
+ &&
childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())
+ &&
!isSerialOperatorOnBe(translatorContext.getConnectContext())) {
+ require = LocalExchangeTypeRequire.requirePassthrough();
+ }
+
+ // 4. Satisfy check: child output meets requirement → done
+ if (require.satisfy(childOutput.second)) {
+ return childOutput;
+ }
+
+ // 4. Layer 1: skip LE when serial operator or ancestor in same
pipeline
+ // Equivalent to BE's need_to_local_exchange:
any_of(operators[idx..end], is_serial) → skip
+ if (translatorContext.hasSerialAncestorInPipeline(this) ||
isSerialNode()) {
+ return childOutput;
Review Comment:
Good catch — fixed in 15d92ba024b. Both the Layer 1 skip and the
serial-ancestor propagation in `enforceRequire` now use
`isSerialOperatorOnBe(translatorContext.getConnectContext())` instead of the
raw `isSerialNode()`.
Verified that BE's `OperatorBase` constructs from the Thrift
`is_serial_operator` flag (which FE writes via `isSerialOperatorOnBe`, not
`isSerialNode`) — so `Pipeline::need_to_local_exchange`'s
`op->is_serial_operator()` check returns false when
`fragment.useSerialSource(ctx)` is false, even if `isSerialNode()` is true. The
previous code would have over-skipped LocalExchange in exactly the scenarios
you listed (`ignore_storage_data_distribution=false`, query cache, NAAJ).
Updated `LocalShuffleNodeCoverageTest.testMaterializationNode` and
`testSetOperationAndAssertNumRowsNode` to reflect the corrected behavior: in
the fragment-less unit-test path `isSerialOperatorOnBe` returns false (the
`fragment != null` guard) so the framework no longer skips Layer 1 and inserts
the required LocalExchange.
Other `isSerialNode()` call sites in PlanNode.java were audited and left
as-is:
- `toThrift()` already uses `isSerialOperatorOnBe`
- `hasSerialChildren()` is a pure node-level tree walk used only for
fragment-internal heuristics
- `createLocalExchange()` heavy-op gate is already inside a
`fragment.useSerialSource(ctx)` branch, so `isSerialNode` and
`isSerialOperatorOnBe` are equivalent there
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]