924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413407415
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java:
##########
@@ -109,10 +109,36 @@ public class PlanTranslatorContext {
private final Map<ScanNode, Set<SlotId>> statsUnknownColumnsMap =
Maps.newHashMap();
+ // Per-node "is there a serial operator between me and the pipeline's
sink" flag.
+ // Mirrors BE's any_of(operators[idx..end], is_serial_operator) check used
by
+ // _add_local_exchange / need_to_local_exchange to skip LE insertion when
an ancestor
+ // in the same pipeline is already serial (the whole pipeline runs with 1
task, so an
+ // extra LE would be a no-op). Written by AddLocalExchange entry +
PlanNode.enforceRequire
+ // step 1 (root → leaf during traversal). Read by PlanNode.enforceRequire
step 4 (Layer 1
+ // skip) and by child overrides that compute their require. Reset to
false at fragment
+ // root and across pipeline boundaries (see shouldResetSerialFlagForChild).
+ private final Map<PlanNodeId, Boolean> serialAncestorInPipelineMap =
Maps.newHashMap();
+
+ // Per-node "is there a downstream operator that depends on hash
distribution for
+ // correctness, with HASH/NOOP path connecting it to me" flag. Mirrors
BE's
+ // _followed_by_shuffled_operator propagation in
pipeline_fragment_context.cpp.
+ // Written by PlanNode.enforceRequire step 1b (root → leaf). Read by
SetOperationNode
+ // to decide whether to propagate hash requirement to its inputs (only
when downstream
+ // needs shuffle for correctness, not just for performance like
StreamingAgg pre-agg).
+ private final Map<PlanNodeId, Boolean> shuffledAncestorMap =
Maps.newHashMap();
+
+ // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan
with
+ // ignoreDataDistribution → _parallel_instances=1 in BE). When true,
serial operators
+ // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out
(heavy_ops).
private boolean isTopMaterializeNode = true;
private final Set<SlotId> virtualColumnIds = Sets.newHashSet();
+ // Used by AddLocalExchange: tracks whether any serial operator exists
Review Comment:
Done in `fe9a3316be0` — removed the field and its getter/setter. The live
serial-ancestor propagation uses `serialAncestorInPipelineMap` (set at
`AddLocalExchange:94` / `PlanNode:1108`), so this was indeed dead.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1026,261 @@ private String mergeIcebergAccessPathsWithId(
}
return StringUtils.join(mergeDisplayAccessPaths, ", ");
}
+
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+ ArrayList<PlanNode> newChildren = Lists.newArrayList();
+ for (int i = 0; i < children.size(); i++) {
+ Pair<PlanNode, LocalExchangeType> childOutput
+ = enforceRequire(translatorContext, children.get(i), i,
LocalExchangeTypeRequire.noRequire());
+ newChildren.add(childOutput.first);
+ }
+ this.children = newChildren;
+ return Pair.of(this, LocalExchangeType.NOOP);
+ }
+
+ /**
+ * Unified framework method: propagate serial flag → recurse child →
satisfy check → Layer 1 skip → insert LE.
+ * Replaces the old
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+ *
+ * <h3>Data flow</h3>
+ * <ul>
+ * <li><b>serial-ancestor flag</b> ({@link
PlanTranslatorContext#hasSerialAncestorInPipeline})
+ * — flows root → leaf during traversal. Mirrors BE's
+ * {@code any_of(operators[idx..end], is_serial_operator)} check
used by
+ * {@code _add_local_exchange} to skip LE insertion when an ancestor
in the same
+ * pipeline is already serial. Reset at pipeline boundaries via
+ * {@link #shouldResetSerialFlagForChild}.</li>
+ * <li><b>shuffle-for-correctness flag</b>
+ * ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor})
— also flows
+ * root → leaf. Mirrors BE's {@code
_followed_by_shuffled_operator}: tells a
+ * child whether some downstream operator depends on hash
distribution for
+ * correctness, so {@code SetOperationNode} can pre-shuffle union
branches.</li>
+ * <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} —
first is the
+ * (possibly LE-wrapped) child; second is the actual output
distribution as
+ * observed by the parent. Caller's {@code require.satisfy(output)}
decides
+ * whether more LE is needed.</li>
+ * <li><b>parent.require</b> describes the constraint on the child
output —
+ * computed inside the parent's {@code
enforceAndDeriveLocalExchange} per child.</li>
+ * </ul>
+ *
+ * <h3>Invariants</h3>
+ * <ul>
+ * <li>Every serial → non-serial transition has an LE somewhere between
them
+ * (enforced by framework step 3 below, validated by
+ * {@link
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>
Review Comment:
Done in `fe9a3316be0`. Removed the dead `@link` to
`validateNoSerialWithoutLocalExchange` and rewrote the bullet to the narrower
truth: framework step 3 inserts the LE only where a serial→non-serial
transition actually needs redistribution; a serial child feeding a parent that
requires PASSTHROUGH/noRequire (TableFunction, NLJ, Agg) is already correct and
needs none — so it is intentionally not validated by a post-pass.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1026,261 @@ private String mergeIcebergAccessPathsWithId(
}
return StringUtils.join(mergeDisplayAccessPaths, ", ");
}
+
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+ ArrayList<PlanNode> newChildren = Lists.newArrayList();
+ for (int i = 0; i < children.size(); i++) {
+ Pair<PlanNode, LocalExchangeType> childOutput
+ = enforceRequire(translatorContext, children.get(i), i,
LocalExchangeTypeRequire.noRequire());
+ newChildren.add(childOutput.first);
+ }
+ this.children = newChildren;
+ return Pair.of(this, LocalExchangeType.NOOP);
+ }
+
+ /**
+ * Unified framework method: propagate serial flag → recurse child →
satisfy check → Layer 1 skip → insert LE.
+ * Replaces the old
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+ *
+ * <h3>Data flow</h3>
+ * <ul>
+ * <li><b>serial-ancestor flag</b> ({@link
PlanTranslatorContext#hasSerialAncestorInPipeline})
+ * — flows root → leaf during traversal. Mirrors BE's
+ * {@code any_of(operators[idx..end], is_serial_operator)} check
used by
+ * {@code _add_local_exchange} to skip LE insertion when an ancestor
in the same
+ * pipeline is already serial. Reset at pipeline boundaries via
+ * {@link #shouldResetSerialFlagForChild}.</li>
+ * <li><b>shuffle-for-correctness flag</b>
+ * ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor})
— also flows
+ * root → leaf. Mirrors BE's {@code
_followed_by_shuffled_operator}: tells a
+ * child whether some downstream operator depends on hash
distribution for
+ * correctness, so {@code SetOperationNode} can pre-shuffle union
branches.</li>
+ * <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} —
first is the
+ * (possibly LE-wrapped) child; second is the actual output
distribution as
+ * observed by the parent. Caller's {@code require.satisfy(output)}
decides
+ * whether more LE is needed.</li>
+ * <li><b>parent.require</b> describes the constraint on the child
output —
+ * computed inside the parent's {@code
enforceAndDeriveLocalExchange} per child.</li>
+ * </ul>
+ *
+ * <h3>Invariants</h3>
+ * <ul>
+ * <li>Every serial → non-serial transition has an LE somewhere between
them
+ * (enforced by framework step 3 below, validated by
+ * {@link
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>
Review Comment:
Done in `fe9a3316be0` (same fix as the other thread on this line): the
`@link` to the removed `validateNoSerialWithoutLocalExchange` is gone and the
invariant is reworded to the narrower condition rather than claiming every
serial→non-serial transition has an LE.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -142,7 +145,8 @@ public abstract class PlanNode extends TreeNode<PlanNode> {
protected int nereidsId = -1;
- private List<List<Expr>> childrenDistributeExprLists = new ArrayList<>();
+ protected List<List<Expr>> childrenDistributeExprLists = new ArrayList<>();
+ protected List<Expr> distributeExprLists = new ArrayList<>();
Review Comment:
Done in `fe9a3316be0` — added comments: `childrenDistributeExprLists.get(i)`
is the per-child hash-key expr list used to repartition the i-th child's input;
`distributeExprLists` is this node's own output hash-key exprs serialized to BE.
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1796,6 +2011,89 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
break;
}
+ case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
+ op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode,
next_operator_id(), descs);
+ // The downstream pipeline (containing LocalExchangeSource) must have
+ // _num_instances tasks — matching BE-native
_inherit_pipeline_properties
+ // which sets pipe_with_source.set_num_tasks(_num_instances).
+ // Without this, when the parent pipeline was reduced by a serial
operator
+ // (e.g., serial Exchange with use_serial_exchange=true, or
UNPARTITIONED
+ // Exchange), the downstream inherits the reduced num_tasks via
+ // add_pipeline(parent). The deferred exchanger creates _num_instances
+ // channels but only fewer source tasks initialize mem_counters — the
+ // sink round-robins to all channels and crashes on uninitialized ones.
+ auto downstream_num_tasks = _num_instances;
Review Comment:
Done in `fe9a3316be0` — inlined to `cur_pipe->set_num_tasks(_num_instances)`.
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
return Status::OK();
}
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+ for (auto& info : _deferred_exchangers) {
+ // DANGER ZONE — do not "fix" this line without reading the history.
+ //
+ // sender_count seeds Exchanger::_running_sink_operators, which the
source side
+ // waits to reach 0 via sub_running_sink_operators on each sink
LocalState close.
+ // The correct value is THIS pipeline-instance's sink task count,
which is exactly
+ // info.upstream_pipe->num_tasks() — one PipelineTask per task, one
close per task.
+ //
+ // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to
mirror the
+ // BE-planned path in _add_local_exchange_impl (~line 1023). THIS
BREAKS the
+ // common FE-planned shape of `serial scan → LE(PT) → ...`:
upstream_pipe
+ // genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+ // _num_instances so _running_sink_operators never reaches 0 —
downstream
+ // sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+ // mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build
949402
Review Comment:
Done in `fe9a3316be0` — dropped the internal build id from the comment (kept
the test reference).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]