924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413407415


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java:
##########
@@ -109,10 +109,36 @@ public class PlanTranslatorContext {
 
     private final Map<ScanNode, Set<SlotId>> statsUnknownColumnsMap = 
Maps.newHashMap();
 
+    // Per-node "is there a serial operator between me and the pipeline's 
sink" flag.
+    // Mirrors BE's any_of(operators[idx..end], is_serial_operator) check used 
by
+    // _add_local_exchange / need_to_local_exchange to skip LE insertion when 
an ancestor
+    // in the same pipeline is already serial (the whole pipeline runs with 1 
task, so an
+    // extra LE would be a no-op).  Written by AddLocalExchange entry + 
PlanNode.enforceRequire
+    // step 1 (root → leaf during traversal).  Read by PlanNode.enforceRequire 
step 4 (Layer 1
+    // skip) and by child overrides that compute their require.  Reset to 
false at fragment
+    // root and across pipeline boundaries (see shouldResetSerialFlagForChild).
+    private final Map<PlanNodeId, Boolean> serialAncestorInPipelineMap = 
Maps.newHashMap();
+
+    // Per-node "is there a downstream operator that depends on hash 
distribution for
+    // correctness, with HASH/NOOP path connecting it to me" flag.  Mirrors 
BE's
+    // _followed_by_shuffled_operator propagation in 
pipeline_fragment_context.cpp.
+    // Written by PlanNode.enforceRequire step 1b (root → leaf).  Read by 
SetOperationNode
+    // to decide whether to propagate hash requirement to its inputs (only 
when downstream
+    // needs shuffle for correctness, not just for performance like 
StreamingAgg pre-agg).
+    private final Map<PlanNodeId, Boolean> shuffledAncestorMap = 
Maps.newHashMap();
+
+    // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan 
with
+    // ignoreDataDistribution → _parallel_instances=1 in BE). When true, 
serial operators
+    // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out 
(heavy_ops).
     private boolean isTopMaterializeNode = true;
 
     private final Set<SlotId> virtualColumnIds = Sets.newHashSet();
 
+    // Used by AddLocalExchange: tracks whether any serial operator exists

Review Comment:
   Done in `fe9a3316be0` — removed the field and its getter/setter. The live 
serial-ancestor propagation uses `serialAncestorInPipelineMap` (set at 
`AddLocalExchange:94` / `PlanNode:1108`), so this was indeed dead.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1026,261 @@ private String mergeIcebergAccessPathsWithId(
         }
         return StringUtils.join(mergeDisplayAccessPaths, ", ");
     }
+
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            Pair<PlanNode, LocalExchangeType> childOutput
+                    = enforceRequire(translatorContext, children.get(i), i, 
LocalExchangeTypeRequire.noRequire());
+            newChildren.add(childOutput.first);
+        }
+        this.children = newChildren;
+        return Pair.of(this, LocalExchangeType.NOOP);
+    }
+
+    /**
+     * Unified framework method: propagate serial flag → recurse child → 
satisfy check → Layer 1 skip → insert LE.
+     * Replaces the old 
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+     *
+     * <h3>Data flow</h3>
+     * <ul>
+     *   <li><b>serial-ancestor flag</b> ({@link 
PlanTranslatorContext#hasSerialAncestorInPipeline})
+     *       — flows root → leaf during traversal.  Mirrors BE's
+     *       {@code any_of(operators[idx..end], is_serial_operator)} check 
used by
+     *       {@code _add_local_exchange} to skip LE insertion when an ancestor 
in the same
+     *       pipeline is already serial.  Reset at pipeline boundaries via
+     *       {@link #shouldResetSerialFlagForChild}.</li>
+     *   <li><b>shuffle-for-correctness flag</b>
+     *       ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor}) 
— also flows
+     *       root → leaf.  Mirrors BE's {@code 
_followed_by_shuffled_operator}: tells a
+     *       child whether some downstream operator depends on hash 
distribution for
+     *       correctness, so {@code SetOperationNode} can pre-shuffle union 
branches.</li>
+     *   <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} — 
first is the
+     *       (possibly LE-wrapped) child; second is the actual output 
distribution as
+     *       observed by the parent.  Caller's {@code require.satisfy(output)} 
decides
+     *       whether more LE is needed.</li>
+     *   <li><b>parent.require</b> describes the constraint on the child 
output —
+     *       computed inside the parent's {@code 
enforceAndDeriveLocalExchange} per child.</li>
+     * </ul>
+     *
+     * <h3>Invariants</h3>
+     * <ul>
+     *   <li>Every serial → non-serial transition has an LE somewhere between 
them
+     *       (enforced by framework step 3 below, validated by
+     *       {@link 
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>

Review Comment:
   Done in `fe9a3316be0`. Removed the dead `@link` to 
`validateNoSerialWithoutLocalExchange` and rewrote the bullet to the narrower 
truth: framework step 3 inserts the LE only where a serial→non-serial 
transition actually needs redistribution; a serial child feeding a parent that 
requires PASSTHROUGH/noRequire (TableFunction, NLJ, Agg) is already correct and 
needs none — so it is intentionally not validated by a post-pass.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -939,4 +1026,261 @@ private String mergeIcebergAccessPathsWithId(
         }
         return StringUtils.join(mergeDisplayAccessPaths, ", ");
     }
+
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            Pair<PlanNode, LocalExchangeType> childOutput
+                    = enforceRequire(translatorContext, children.get(i), i, 
LocalExchangeTypeRequire.noRequire());
+            newChildren.add(childOutput.first);
+        }
+        this.children = newChildren;
+        return Pair.of(this, LocalExchangeType.NOOP);
+    }
+
+    /**
+     * Unified framework method: propagate serial flag → recurse child → 
satisfy check → Layer 1 skip → insert LE.
+     * Replaces the old 
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+     *
+     * <h3>Data flow</h3>
+     * <ul>
+     *   <li><b>serial-ancestor flag</b> ({@link 
PlanTranslatorContext#hasSerialAncestorInPipeline})
+     *       — flows root → leaf during traversal.  Mirrors BE's
+     *       {@code any_of(operators[idx..end], is_serial_operator)} check 
used by
+     *       {@code _add_local_exchange} to skip LE insertion when an ancestor 
in the same
+     *       pipeline is already serial.  Reset at pipeline boundaries via
+     *       {@link #shouldResetSerialFlagForChild}.</li>
+     *   <li><b>shuffle-for-correctness flag</b>
+     *       ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor}) 
— also flows
+     *       root → leaf.  Mirrors BE's {@code 
_followed_by_shuffled_operator}: tells a
+     *       child whether some downstream operator depends on hash 
distribution for
+     *       correctness, so {@code SetOperationNode} can pre-shuffle union 
branches.</li>
+     *   <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} — 
first is the
+     *       (possibly LE-wrapped) child; second is the actual output 
distribution as
+     *       observed by the parent.  Caller's {@code require.satisfy(output)} 
decides
+     *       whether more LE is needed.</li>
+     *   <li><b>parent.require</b> describes the constraint on the child 
output —
+     *       computed inside the parent's {@code 
enforceAndDeriveLocalExchange} per child.</li>
+     * </ul>
+     *
+     * <h3>Invariants</h3>
+     * <ul>
+     *   <li>Every serial → non-serial transition has an LE somewhere between 
them
+     *       (enforced by framework step 3 below, validated by
+     *       {@link 
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>

Review Comment:
   Done in `fe9a3316be0` (same fix as the other thread on this line): the 
`@link` to the removed `validateNoSerialWithoutLocalExchange` is gone and the 
invariant is reworded to the narrower condition rather than claiming every 
serial→non-serial transition has an LE.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -142,7 +145,8 @@ public abstract class PlanNode extends TreeNode<PlanNode> {
 
     protected int nereidsId = -1;
 
-    private List<List<Expr>> childrenDistributeExprLists = new ArrayList<>();
+    protected List<List<Expr>> childrenDistributeExprLists = new ArrayList<>();
+    protected List<Expr> distributeExprLists = new ArrayList<>();

Review Comment:
   Done in `fe9a3316be0` — added comments: `childrenDistributeExprLists.get(i)` 
is the per-child hash-key expr list used to repartition the i-th child's input; 
`distributeExprLists` is this node's own output hash-key exprs serialized to BE.



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1796,6 +2011,89 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         break;
     }
+    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
+        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        // The downstream pipeline (containing LocalExchangeSource) must have
+        // _num_instances tasks — matching BE-native 
_inherit_pipeline_properties
+        // which sets pipe_with_source.set_num_tasks(_num_instances).
+        // Without this, when the parent pipeline was reduced by a serial 
operator
+        // (e.g., serial Exchange with use_serial_exchange=true, or 
UNPARTITIONED
+        // Exchange), the downstream inherits the reduced num_tasks via
+        // add_pipeline(parent).  The deferred exchanger creates _num_instances
+        // channels but only fewer source tasks initialize mem_counters — the
+        // sink round-robins to all channels and crashes on uninitialized ones.
+        auto downstream_num_tasks = _num_instances;

Review Comment:
   Done in `fe9a3316be0` — inlined to `cur_pipe->set_num_tasks(_num_instances)`.



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
     return Status::OK();
 }
 
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+    for (auto& info : _deferred_exchangers) {
+        // DANGER ZONE — do not "fix" this line without reading the history.
+        //
+        // sender_count seeds Exchanger::_running_sink_operators, which the 
source side
+        // waits to reach 0 via sub_running_sink_operators on each sink 
LocalState close.
+        // The correct value is THIS pipeline-instance's sink task count, 
which is exactly
+        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one 
close per task.
+        //
+        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to 
mirror the
+        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS 
BREAKS the
+        //   common FE-planned shape of `serial scan → LE(PT) → ...`: 
upstream_pipe
+        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+        //   _num_instances so _running_sink_operators never reaches 0 — 
downstream
+        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build 
949402

Review Comment:
   Done in `fe9a3316be0` — dropped the internal build id from the comment (kept 
the test reference).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to