This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 22b78ddeed1764d8362303bf8102ba526b4db3e9 Author: 924060929 <[email protected]> AuthorDate: Fri Mar 27 18:54:54 2026 +0800 [refactor](local shuffle) simplify FE local exchange enforcement helpers - PlanNode: extract enforceChildExchange() helper for join/set-operation nodes to enforce child exchange without serial-ancestor check or heavy-ops avoidance; add javadoc to shouldResetSerialFlagForChild() - HashJoinNode/NestedLoopJoinNode: use enforceChildExchange() instead of inline deriveAndEnforceChildLocalExchange() + manual exchange insertion - SetOperationNode: merge duplicated Union/Intersect+Except child-enforcement loops into a single shared loop using enforceChildExchange(); also fixes set_intersect/set_except FE/BE consistency mismatches - AddLocalExchange: remove dead shouldUseLocalExecutionHash() parameters (always returns true, no context needed) --- .../org/apache/doris/planner/AddLocalExchange.java | 16 +----- .../org/apache/doris/planner/HashJoinNode.java | 37 +++---------- .../apache/doris/planner/NestedLoopJoinNode.java | 32 ++---------- .../java/org/apache/doris/planner/PlanNode.java | 32 ++++++++++++ .../org/apache/doris/planner/SetOperationNode.java | 61 +++++----------------- 5 files changed, 57 insertions(+), 121 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java index 4b9ebcb53f6..d982ab80385 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java @@ -103,21 +103,7 @@ public class AddLocalExchange { private static boolean shouldUseLocalExecutionHash( PlanTranslatorContext translatorContext, PlanNode parent, PlanNode child) { - PlanFragment fragment = null; - if (parent != null) { - fragment = parent.getFragment(); - } - if (fragment == null && child != null) { - fragment = child.getFragment(); - } - - if (fragment != null && fragment.useSerialSource(translatorContext.getConnectContext())) { - return true; - } - if (child instanceof ScanNode) { - return true; - } - // For FE-planned intra-fragment hash exchanges, always prefer LOCAL_EXECUTION_HASH_SHUFFLE. + // Always prefer LOCAL_EXECUTION_HASH_SHUFFLE for FE-planned intra-fragment hash exchanges. // GLOBAL_EXECUTION_HASH_SHUFFLE requires shuffle_idx_to_instance_idx which may be empty // for fragments with non-hash sinks (UNPARTITIONED/MERGE). LOCAL_HASH is always safe // since it partitions by local instance count without needing external shuffle maps. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 11eab9f9e22..1b90770f539 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -331,38 +331,13 @@ public class HashJoinNode extends JoinNodeBase { outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE; } - PlanNode probeSide = children.get(0); - Pair<PlanNode, LocalExchangeType> probeSideOutput = deriveAndEnforceChildLocalExchange( - translatorContext, probeSide, probeSideRequire, 0); - if (!probeSideRequire.satisfy(probeSideOutput.second)) { - LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( - probeSideRequire, translatorContext, this, probeSideOutput.first); - probeSide = new LocalExchangeNode( - translatorContext.nextPlanNodeId(), probeSideOutput.first, preferType, - getChildDistributeExprList(0) - ); - } else { - probeSide = probeSideOutput.first; - } - - PlanNode buildSide = children.get(1); - Pair<PlanNode, LocalExchangeType> buildSideOutput = deriveAndEnforceChildLocalExchange( - translatorContext, buildSide, buildSideRequire, 1); - if (!buildSideRequire.satisfy(buildSideOutput.second)) { - LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( - buildSideRequire, translatorContext, this, buildSideOutput.first); - buildSide = new LocalExchangeNode( - translatorContext.nextPlanNodeId(), buildSideOutput.first, preferType, - getChildDistributeExprList(1) - ); - } else { - buildSide = buildSideOutput.first; - } - - this.children = Lists.newArrayList(probeSide, buildSide); - + Pair<PlanNode, LocalExchangeType> probeResult = enforceChildExchange( + translatorContext, probeSideRequire, children.get(0), 0); + Pair<PlanNode, LocalExchangeType> buildResult = enforceChildExchange( + translatorContext, buildSideRequire, children.get(1), 1); + this.children = Lists.newArrayList(probeResult.first, buildResult.first); if (outputType == null) { - outputType = probeSideOutput.second; + outputType = probeResult.second; } return Pair.of(this, outputType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index fd9ac16ccf5..a82e78ee093 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -207,34 +207,10 @@ public class NestedLoopJoinNode extends JoinNodeBase { outputType = LocalExchangeType.ADAPTIVE_PASSTHROUGH; } - PlanNode probeSide = children.get(0); - Pair<PlanNode, LocalExchangeType> probeSideOutput = deriveAndEnforceChildLocalExchange( - translatorContext, probeSide, probeSideRequire, 0); - if (!probeSideRequire.satisfy(probeSideOutput.second)) { - LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( - probeSideRequire, translatorContext, this, probeSideOutput.first); - probeSide = new LocalExchangeNode( - translatorContext.nextPlanNodeId(), probeSideOutput.first, preferType, - getChildDistributeExprList(0) - ); - } else { - probeSide = probeSideOutput.first; - } - - PlanNode buildSide = children.get(1); - Pair<PlanNode, LocalExchangeType> buildSideOutput = deriveAndEnforceChildLocalExchange( - translatorContext, buildSide, buildSideRequire, 1); - if (!buildSideRequire.satisfy(buildSideOutput.second)) { - LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( - buildSideRequire, translatorContext, this, buildSideOutput.first); - buildSide = new LocalExchangeNode( - translatorContext.nextPlanNodeId(), buildSideOutput.first, preferType, - getChildDistributeExprList(1) - ); - } else { - buildSide = buildSideOutput.first; - } - + PlanNode probeSide = enforceChildExchange( + translatorContext, probeSideRequire, children.get(0), 0).first; + PlanNode buildSide = enforceChildExchange( + translatorContext, buildSideRequire, children.get(1), 1).first; this.children = Lists.newArrayList(probeSide, buildSide); return Pair.of(this, outputType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 2c3ab311e7d..a6059aef26b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1017,6 +1017,38 @@ public abstract class PlanNode extends TreeNode<PlanNode> { return child.enforceAndDeriveLocalExchange(translatorContext, this, requireChild); } + /** + * Enforces a local exchange requirement on a single child without the serial-ancestor + * check or heavy-ops bottleneck avoidance that {@link #enforceChild} applies. + * Use for nodes whose children's distribution requirements must be satisfied regardless + * of serial ancestors in the same pipeline (joins, set operations, etc.). + * + * @return (resultNode, childOutputType) — resultNode may be a new LocalExchangeNode wrapper + * if an exchange was inserted; childOutputType is the child's reported output + * distribution before any inserted exchange (useful for deriving the parent's output). + */ + protected Pair<PlanNode, LocalExchangeType> enforceChildExchange( + PlanTranslatorContext translatorContext, LocalExchangeTypeRequire require, + PlanNode child, int childIndex) { + Pair<PlanNode, LocalExchangeType> childOutput = deriveAndEnforceChildLocalExchange( + translatorContext, child, require, childIndex); + if (!require.satisfy(childOutput.second)) { + LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( + require, translatorContext, this, childOutput.first); + return Pair.of( + new LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first, + preferType, getChildDistributeExprList(childIndex)), + childOutput.second); + } + return childOutput; + } + + /** + * Whether the child at {@code childIndex} starts a new pipeline context, causing + * its serial-ancestor flag to be reset to {@code false} rather than inherited from this node. + * Override to return {@code true} for pipeline-splitting nodes (LocalExchangeNode) and nodes + * whose children run in an independent pipeline segment (SortNode above analytic, etc.). + */ protected boolean shouldResetSerialFlagForChild(int childIndex) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java index 7343a74fd3f..bf1c2421e8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -206,69 +206,36 @@ public abstract class SetOperationNode extends PlanNode { @Override public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, PlanNode parent, LocalExchangeTypeRequire parentRequire) { + LocalExchangeTypeRequire requireChild; + LocalExchangeType outputType; + PlanNode firstChild = children.isEmpty() ? null : children.get(0); if (this instanceof UnionNode) { - ArrayList<PlanNode> newChildren = Lists.newArrayList(); // Propagate parent's hash requirement to children when parent requires hash distribution. // Matches BE's UnionSinkOperatorX which returns GLOBAL_HASH(_distribute_exprs) whenever // _followed_by_shuffled_operator=true, regardless of whether _distribute_exprs is empty. boolean canPropagateHash = parentRequire.preferType().isHashShuffle(); - LocalExchangeTypeRequire requireChild = canPropagateHash - ? parentRequire : LocalExchangeTypeRequire.noRequire(); - LocalExchangeType outputType = canPropagateHash - ? AddLocalExchange.resolveExchangeType(requireChild, translatorContext, this, - children.isEmpty() ? null : children.get(0)) + requireChild = canPropagateHash ? parentRequire : LocalExchangeTypeRequire.noRequire(); + outputType = canPropagateHash + ? AddLocalExchange.resolveExchangeType(requireChild, translatorContext, this, firstChild) : LocalExchangeType.NOOP; - - for (int i = 0; i < children.size(); i++) { - PlanNode child = children.get(i); - Pair<PlanNode, LocalExchangeType> childOutput - = deriveAndEnforceChildLocalExchange(translatorContext, child, requireChild, i); - if (!requireChild.satisfy(childOutput.second)) { - LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( - requireChild, translatorContext, this, childOutput.first); - LocalExchangeNode localExchangeNode - = new LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first, - preferType, getChildDistributeExprList(i)); - newChildren.add(localExchangeNode); - } else { - newChildren.add(childOutput.first); - } - } - - this.children = newChildren; - return Pair.of(this, outputType); } else { - LocalExchangeTypeRequire requireChild; - LocalExchangeType outputType; + // Intersect / Except if (AddLocalExchange.isColocated(this)) { requireChild = LocalExchangeTypeRequire.requireBucketHash(); outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE; } else { requireChild = parentRequire.autoRequireHash(); outputType = AddLocalExchange.resolveExchangeType( - requireChild, translatorContext, this, children.isEmpty() ? null : children.get(0)); - } - - ArrayList<PlanNode> newChildren = Lists.newArrayList(); - for (int i = 0; i < children.size(); i++) { - PlanNode child = children.get(i); - Pair<PlanNode, LocalExchangeType> childOutput - = deriveAndEnforceChildLocalExchange(translatorContext, child, requireChild, i); - if (!requireChild.satisfy(childOutput.second)) { - LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( - requireChild, translatorContext, this, childOutput.first); - LocalExchangeNode localExchangeNode - = new LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first, - preferType, getChildDistributeExprList(i)); - newChildren.add(localExchangeNode); - } else { - newChildren.add(childOutput.first); - } + requireChild, translatorContext, this, firstChild); } + } - this.children = newChildren; - return Pair.of(this, outputType); + ArrayList<PlanNode> newChildren = Lists.newArrayList(); + for (int i = 0; i < children.size(); i++) { + newChildren.add(enforceChildExchange(translatorContext, requireChild, children.get(i), i).first); } + this.children = newChildren; + return Pair.of(this, outputType); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
