morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3412722149
##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1026,261 @@ private String mergeIcebergAccessPathsWithId(
}
return StringUtils.join(mergeDisplayAccessPaths, ", ");
}
+
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+ ArrayList<PlanNode> newChildren = Lists.newArrayList();
+ for (int i = 0; i < children.size(); i++) {
+ Pair<PlanNode, LocalExchangeType> childOutput
+ = enforceRequire(translatorContext, children.get(i), i,
LocalExchangeTypeRequire.noRequire());
+ newChildren.add(childOutput.first);
+ }
+ this.children = newChildren;
+ return Pair.of(this, LocalExchangeType.NOOP);
+ }
+
+ /**
+ * Unified framework method: propagate serial flag → recurse child →
satisfy check → Layer 1 skip → insert LE.
+ * Replaces the old
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+ *
+ * <h3>Data flow</h3>
+ * <ul>
+ * <li><b>serial-ancestor flag</b> ({@link
PlanTranslatorContext#hasSerialAncestorInPipeline})
+ * — flows root → leaf during traversal. Mirrors BE's
+ * {@code any_of(operators[idx..end], is_serial_operator)} check
used by
+ * {@code _add_local_exchange} to skip LE insertion when an ancestor
in the same
+ * pipeline is already serial. Reset at pipeline boundaries via
+ * {@link #shouldResetSerialFlagForChild}.</li>
+ * <li><b>shuffle-for-correctness flag</b>
+ * ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor})
— also flows
+ * root → leaf. Mirrors BE's {@code
_followed_by_shuffled_operator}: tells a
+ * child whether some downstream operator depends on hash
distribution for
+ * correctness, so {@code SetOperationNode} can pre-shuffle union
branches.</li>
+ * <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} —
first is the
+ * (possibly LE-wrapped) child; second is the actual output
distribution as
+ * observed by the parent. Caller's {@code require.satisfy(output)}
decides
+ * whether more LE is needed.</li>
+ * <li><b>parent.require</b> describes the constraint on the child
output —
+ * computed inside the parent's {@code
enforceAndDeriveLocalExchange} per child.</li>
+ * </ul>
+ *
+ * <h3>Invariants</h3>
+ * <ul>
+ * <li>Every serial → non-serial transition has an LE somewhere between
them
+ * (enforced by framework step 3 below, validated by
+ * {@link
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>
Review Comment:
[P3] Please update this invariant after removing
`validateNoSerialWithoutLocalExchange`. Commit `3630071a` deleted that
validation because some serial-to-non-serial transitions legitimately have no
LocalExchange, but this bullet still says every transition has one and links to
the now-nonexistent method. Please rewrite it to describe the narrower
condition enforced by step 3, or remove the stale validation claim.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]