morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3414840309
##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1031,263 @@ private String mergeIcebergAccessPathsWithId(
}
return StringUtils.join(mergeDisplayAccessPaths, ", ");
}
+
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+ ArrayList<PlanNode> newChildren = Lists.newArrayList();
+ for (int i = 0; i < children.size(); i++) {
+ Pair<PlanNode, LocalExchangeType> childOutput
+ = enforceRequire(translatorContext, children.get(i), i,
LocalExchangeTypeRequire.noRequire());
+ newChildren.add(childOutput.first);
+ }
+ this.children = newChildren;
+ return Pair.of(this, LocalExchangeType.NOOP);
+ }
+
+ /**
+ * Unified framework method: propagate serial flag → recurse child →
satisfy check → Layer 1 skip → insert LE.
+ * Replaces the old
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+ *
+ * <h3>Data flow</h3>
+ * <ul>
+ * <li><b>serial-ancestor flag</b> ({@link
PlanTranslatorContext#hasSerialAncestorInPipeline})
+ * — flows root → leaf during traversal. Mirrors BE's
+ * {@code any_of(operators[idx..end], is_serial_operator)} check
used by
+ * {@code _add_local_exchange} to skip LE insertion when an ancestor
in the same
+ * pipeline is already serial. Reset at pipeline boundaries via
+ * {@link #shouldResetSerialFlagForChild}.</li>
+ * <li><b>shuffle-for-correctness flag</b>
+ * ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor})
— also flows
+ * root → leaf. Mirrors BE's {@code
_followed_by_shuffled_operator}: tells a
+ * child whether some downstream operator depends on hash
distribution for
+ * correctness, so {@code SetOperationNode} can pre-shuffle union
branches.</li>
+ * <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} —
first is the
+ * (possibly LE-wrapped) child; second is the actual output
distribution as
+ * observed by the parent. Caller's {@code require.satisfy(output)}
decides
+ * whether more LE is needed.</li>
+ * <li><b>parent.require</b> describes the constraint on the child
output —
+ * computed inside the parent's {@code
enforceAndDeriveLocalExchange} per child.</li>
+ * </ul>
+ *
+ * <h3>Invariants</h3>
+ * <ul>
+ * <li>Where a serial → non-serial transition needs redistribution,
framework step 3 inserts
+ * the LE (e.g. a serial source fanned out via PASSTHROUGH). This
is not a hard invariant:
+ * a serial child feeding a parent that requires PASSTHROUGH /
noRequire (TableFunction,
+ * NLJ, Agg) is already correct and needs no LE, so it is
intentionally not validated by a
+ * post-pass.</li>
+ * <li>{@code LocalExchangeNode} itself is always non-serial — setting
it serial
+ * would defeat its purpose of fanning a 1-task pipeline back to N
tasks.</li>
+ * <li>For pipeline-breaking parents ({@code
shouldResetSerialFlagForChild=true}),
+ * the child starts a fresh pipeline so {@code hasSerialAncestor} is
reset; the
+ * node's own {@code isSerialNode()} still composes in for the
child's view.</li>
+ * <li>{@code RequireHash} accepts any hash flavour; {@code
RequireSpecific} demands
+ * an exact match (with the one PASSTHROUGH/ADAPTIVE_PASSTHROUGH
compatibility).
+ * Pick the looser one whenever correctness allows — see
+ * {@link LocalExchangeNode.LocalExchangeTypeRequire}.</li>
+ * </ul>
+ *
+ * <h3>Layers</h3>
+ * Layer 1 (shouldSkipLE): mirrors BE's need_to_local_exchange — skip when
this node or
+ * an ancestor in the same pipeline is serial (operators[idx..end] has
serial → skip).
+ * Layer 2 (require/output): each Node declares require and output in
enforceAndDeriveLocalExchange.
+ */
+ protected Pair<PlanNode, LocalExchangeType> enforceRequire(
+ PlanTranslatorContext translatorContext, PlanNode child, int
childIndex,
+ LocalExchangeTypeRequire require) {
+ // 1. Propagate serial-ancestor flag to child.
+ // For pipeline-splitting operators (shouldReset=true, e.g.
non-streaming AGG):
+ // Drop inherited serial flag from parent (parent is in a different
pipeline),
+ // but keep this node's own serial status (child is in the same
pipeline as this
+ // node's sink, e.g. Exchange is in AGG_Sink pipeline).
+ // For non-splitting operators (shouldReset=false, e.g. streaming AGG):
+ // Inherit parent's serial flag + this node's own.
+ boolean inheritedSerial = shouldResetSerialFlagForChild(childIndex)
+ ? false : translatorContext.hasSerialAncestorInPipeline(this);
+ // Use isSerialOperatorOnBe (= isSerialNode &&
fragment.useSerialSource) instead of the
+ // raw isSerialNode(). BE's OperatorBase reads the Thrift
`is_serial_operator` flag —
+ // which is what FE writes via isSerialOperatorOnBe — so when the
fragment is not in
+ // serial-source mode, BE treats this operator as non-serial
regardless of isSerialNode.
+ // Using isSerialNode here would set the child's serial-ancestor flag
wider than BE's
+ // view and over-skip required LocalExchanges downstream.
+ boolean childHasSerialAncestor = inheritedSerial
+ || isSerialOperatorOnBe(translatorContext.getConnectContext());
+ translatorContext.setHasSerialAncestorInPipeline(child,
childHasSerialAncestor);
+
+ // 1b. Propagate shuffle-for-correctness-ancestor flag to child.
+ // Mirrors BE's _followed_by_shuffled_operator: a downstream operator
needs hash
+ // distribution for correctness, and the chain to here goes through
HASH or NOOP
+ // requirements (so the dependency is preserved).
+ // propagate = ((inheritedShuffled ||
self.requiresShuffleForCorrectness)
+ // && require is hash)
+ // || (inheritedShuffled && require is noop/passthrough)
+ boolean inheritedShuffled =
translatorContext.hasShuffleForCorrectnessAncestor(this);
+ boolean selfOrInheritedShuffled = inheritedShuffled ||
requiresShuffleForCorrectness();
+ boolean requireIsHash = require.preferType().isHashShuffle();
+ boolean requireIsNoop = require.preferType() ==
LocalExchangeNode.LocalExchangeType.NOOP;
+ boolean childShuffledAncestor = (selfOrInheritedShuffled &&
requireIsHash)
+ || (inheritedShuffled && requireIsNoop);
+ translatorContext.setHasShuffleForCorrectnessAncestor(child,
childShuffledAncestor);
+
+ // 2. Recurse child (Layer 2: child declares its own require/output)
+ Pair<PlanNode, LocalExchangeType> childOutput =
+ child.enforceAndDeriveLocalExchange(translatorContext, this,
require);
+
+ // Steps 2.5 and 3 both react to a serial child but address different
concerns:
+ // - Step 2.5 rewrites the OUTPUT-side view (what we tell
satisfy/parent about
+ // the child's actual distribution). A serial pipeline runs with
1 task so
+ // its distribution claim is meaningless — flatten to NOOP so the
satisfy
+ // check below doesn't get fooled by a stale "I output
BUCKET_HASH" claim.
+ // - Step 3 rewrites the REQUIRE-side decision (what we want from
the child).
+ // If we previously asked for nothing (noRequire) but the child
turns out
+ // to be serial and we're not, upgrade to requirePassthrough so an
LE is
+ // inserted to restore parallelism.
+
+ // 2.5. Serial child override (output side): if child is serial on BE,
force its
+ // reported output to NOOP. Distribution is irrelevant when the
child runs
+ // with 1 task; downstream parallelism is restored either by step
3 (LE
+ // insertion) or skipped entirely by step 4b (we're also serial).
+ if
(childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext()))
{
+ childOutput = Pair.of(childOutput.first, LocalExchangeType.NOOP);
+ }
+
+ // 3. Framework-level serial child check (require side, mirrors BE
base class
+ // required_data_distribution): if child will be serial on BE but
this node is
+ // not serial, the pipeline has a 1-task serial child feeding an
N-task non-serial
+ // parent. Without LE, pipeline splits (AGG/JOIN) create paired
pipelines with
+ // mismatched num_tasks → crash. Upgrade noRequire to
requirePassthrough so an
+ // LE is inserted below to restore parallelism.
+ if (require instanceof LocalExchangeNode.NoRequire
+ &&
childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())
+ &&
!isSerialOperatorOnBe(translatorContext.getConnectContext())) {
+ require = LocalExchangeTypeRequire.requirePassthrough();
+ }
+
+ // 4. Satisfy check: child output meets requirement → done
+ if (require.satisfy(childOutput.second)) {
+ return childOutput;
+ }
+
+ // 4. Layer 1: skip LE when serial operator or ancestor in same
pipeline
+ // Equivalent to BE's need_to_local_exchange:
any_of(operators[idx..end], is_serial) → skip.
+ // Use isSerialOperatorOnBe (not isSerialNode) because BE's
Pipeline::need_to_local_exchange
+ // checks op->is_serial_operator() which reads the Thrift flag set
from isSerialOperatorOnBe;
+ // when fragment.useSerialSource is false, BE treats this node as
non-serial.
+ if (translatorContext.hasSerialAncestorInPipeline(this)
Review Comment:
why can add no local exchange node when `hasSerialAncestorInPipeline(this)`
equals true?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]