924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413413674


##########
fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java:
##########
@@ -733,7 +737,7 @@ public ScanContext getScanContext() {
     }
 
     @Override
-    public boolean isSerialOperator() {
+    public boolean isSerialNode() {
         ConnectContext context = ConnectContext.get();
         if (context == null) {
             return numScanBackends() <= 0;

Review Comment:
   I don't think this is a regression — I checked the actual before/after. 
Master's `ScanNode.isSerialOperator()` body is byte-identical to this PR's 
`isSerialNode()` (both: `numScanBackends() <= 0 || getScanRangeNum() < 
parallelExecInstanceNum * numScanBackends() || isForceToLocalShuffle()`); 
neither version ever contained `fragment.useSerialSource(context)`. At the 
decision point (`Coordinator.assignScanRanges`, ~`Coordinator.java:2995`) 
master called `isSerialOperator()` and this PR calls `isSerialNode()` — same 
body, pure rename.
   
   The `useSerialSource()` gate that matters there is in the *caller* 
(`ignoreStorageDataDistribution`, ~`Coordinator.java:2939`), which is 
unchanged. So the legacy scan-range assignment behaves exactly as on master. 
Could you point me at the specific old code where 
`isSerialNode()`/`isSerialOperator()` was `numScanBackends() <= 0 || 
fragment.useSerialSource(context)`? I couldn't find that shape in master.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java:
##########
@@ -78,7 +84,17 @@ public int getNumInstances() {
     }
 
     @Override
-    public boolean isSerialOperator() {
+    public boolean isSerialNode() {
         return true;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+
+        Pair<PlanNode, LocalExchangeType> enforceResult = enforceRequire(
+                translatorContext, children.get(0), 0, 
LocalExchangeTypeRequire.requirePassthrough());

Review Comment:
   The BE operator requires PASSTHROUGH, and the FE planner mirrors BE's 
`required_data_distribution` 1:1, so this matches: 
`AssertNumRowsOperatorX::required_data_distribution()` returns 
`{TLocalPartitionType::PASSTHROUGH}` (`assert_num_rows_operator.h:49`). 
Returning NOOP here would diverge from BE-native behavior. The "only one row" 
observation is about output cardinality, but `required_data_distribution` 
describes the input side — AssertNumRows is serial (1 task) and the PASSTHROUGH 
fans the (possibly N-task) child into it, same as BE.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -824,14 +828,97 @@ public void 
foreachDownInCurrentFragment(Consumer<PlanNode> visitor) {
         });
     }
 
-    // Operators need to be executed serially. (e.g. finalized agg without key)
-    public boolean isSerialOperator() {
+    /**
+     * Node-level "is this operator inherently serial" property — answers 
without looking
+     * at the fragment.  Default false; subclasses override (e.g. finalized 
agg without key,
+     * UNPARTITIONED ExchangeNode with merge sort).
+     *
+     * Use ONLY in framework-internal places where we are already iterating 
within a
+     * fragment whose serial-source mode is fixed: {@link 
#shouldResetSerialFlagForChild}
+     * inputs, {@link #createLocalExchange} heavy-op gate, and 
child.isSerialNode() checks
+     * embedded inside an enforceRequire path.  Do NOT use it when computing a
+     * {@link LocalExchangeNode.LocalExchangeTypeRequire} on a child — call
+     * {@link #isSerialOperatorOnBe} instead.
+     */
+    public boolean isSerialNode() {

Review Comment:
   It has to stay `public`: `Coordinator` (package `org.apache.doris.qe`) calls 
`node.get().isSerialNode()` in `assignScanRanges`, so a cross-package caller 
needs public visibility — `protected` wouldn't compile.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+
+/**
+ * FE-side local exchange planner — inserts {@link LocalExchangeNode} into 
each fragment's
+ * plan tree so that within-fragment data redistribution is decided at 
planning time
+ * instead of at BE pipeline-build time.
+ *
+ * <h3>When this runs</h3>
+ * Invoked from {@code NereidsPlanner.addLocalExchangeAfterDistribute()} right 
after
+ * {@code DistributePlanner} has assigned instances to fragments and before 
the plan is
+ * serialized to BE.  Gated by session variable {@code 
enable_local_shuffle_planner}
+ * (default true) and {@code enable_local_shuffle}; when either is off this 
pass is
+ * skipped entirely and BE falls back to its own {@code _plan_local_exchange}. 
 The two
+ * paths are mutually exclusive: BE consults {@code 
runtime_state.h::plan_local_shuffle()}
+ * to know whether it should plan LE itself.
+ *
+ * <h3>What it changes</h3>
+ * <ul>
+ *   <li>For each fragment with {@code maxPerBeInstances > 1}, walks the plan 
tree
+ *       bottom-up via {@link PlanNode#enforceAndDeriveLocalExchange} and 
inserts
+ *       LocalExchangeNodes where children's output distribution doesn't 
satisfy the
+ *       parent's requirement.</li>
+ *   <li>May wrap the fragment root with an extra PASSTHROUGH LE so the data 
sink
+ *       (DataStreamSink / OlapTableSink) runs with the full instance count 
even when
+ *       the root operator is serial — see {@link 
#addLocalExchangeForFragment}.</li>
+ *   <li>Does NOT modify the fragment sink itself, fragment boundaries, or 
instance
+ *       assignment.</li>
+ * </ul>
+ *
+ * <h3>Per-BE instance semantics</h3>
+ * Skips fragments where every BE has at most 1 instance.  Using a global 
instance count
+ * would insert LE for "2 BEs × 1 instance" cases, which BE's own
+ * {@code _plan_local_exchange} would not — leading to pipeline task-count 
mismatch and
+ * deadlock.  See {@link #addLocalExchange}.
+ *
+ * <h3>Reading order</h3>
+ * Start with {@link PlanNode#enforceRequire} (the recursion engine), then 
individual
+ * {@code enforceAndDeriveLocalExchange} overrides on PlanNode subclasses.
+ */
+public class AddLocalExchange {
+    /** addLocalExchange with distributed plans, skipping single-instance 
fragments.
+     *  BE's _plan_local_exchange checks _num_instances which is the per-BE 
instance count.
+     *  With _num_instances<=1 all pipelines on that BE have 1 task so local 
exchange is a no-op.
+     *  We must use the same per-BE semantics: skip when every BE has at most 
1 instance.
+     *  Using global instanceCount would insert LE for fragments where 2 BEs 
each have 1 instance
+     *  (global=2, per-BE=1), causing pipeline task mismatch and deadlock. */
+    public void addLocalExchange(FragmentIdMapping<DistributedPlan> 
distributedPlans,
+            PlanTranslatorContext context) {
+        for (DistributedPlan plan : distributedPlans.values()) {
+            PipelineDistributedPlan pipePlan = (PipelineDistributedPlan) plan;
+            long maxPerBeInstances = pipePlan.getInstanceJobs().stream()
+                    .collect(java.util.stream.Collectors.groupingBy(
+                            j -> j.getAssignedWorker().id(), 
java.util.stream.Collectors.counting()))
+                    
.values().stream().mapToLong(Long::longValue).max().orElse(0);
+            if (maxPerBeInstances <= 1) {
+                continue;
+            }
+            PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
+            addLocalExchangeForFragment(fragment, context);
+        }
+    }
+
+    private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context) {
+        DataSink sink = fragment.getSink();
+        LocalExchangeTypeRequire require = sink == null
+                ? LocalExchangeTypeRequire.noRequire() : 
sink.getLocalExchangeTypeRequire();
+        PlanNode root = fragment.getPlanRoot();
+        context.setHasSerialAncestorInPipeline(root, false);
+        Pair<PlanNode, LocalExchangeType> output = root
+                .enforceAndDeriveLocalExchange(context, null, require);
+        PlanNode newRoot = output.first;
+        // The fragment data sink (DataStreamSink, OlapTableSink) runs in the 
same pipeline
+        // as the root. If the root will be serial on BE, the sink pipeline 
has 1 task —
+        // only instance 0 sends data, others hang or miss writes.
+        // Insert PASSTHROUGH fan-out so sink runs with _num_instances tasks.
+        // This matches BE-native's default required_data_distribution():
+        //   _child->is_serial_operator() ? PASSTHROUGH : NOOP
+        if (newRoot.isSerialOperatorOnBe(context.getConnectContext())) {
+            newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot,
+                    LocalExchangeType.PASSTHROUGH, null);
+        }
+        if (newRoot != root) {
+            fragment.setPlanRoot(newRoot);
+        }
+    }
+
+    public static boolean isColocated(PlanNode plan) {
+        if (plan instanceof AggregationNode) {
+            return ((AggregationNode) plan).isColocate() && 
isColocated(plan.getChild(0));
+        } else if (plan instanceof OlapScanNode) {
+            return true;
+        } else if (plan instanceof SelectNode) {
+            return isColocated(plan.getChild(0));
+        } else if (plan instanceof HashJoinNode) {
+            return ((HashJoinNode) plan).isColocate()
+                    && (isColocated(plan.getChild(0)) || 
isColocated(plan.getChild(1)));
+        } else if (plan instanceof SetOperationNode) {
+            if (!((SetOperationNode) plan).isColocate()) {
+                return false;
+            }
+            for (PlanNode child : plan.getChildren()) {
+                if (isColocated(child)) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return false;
+        }
+    }
+
+    public static LocalExchangeType 
resolveExchangeType(LocalExchangeTypeRequire require) {

Review Comment:
   I'd prefer to keep `preferType()` as `GLOBAL` and not align it down to 
`LOCAL`. The observation is right (it's not authoritative — 
`resolveExchangeType()` decides the actual type, and `hasPartitionExprs()` only 
uses `isHashShuffle()`), but the value itself is a deliberate safe default:
   
   `GLOBAL_EXECUTION_HASH_SHUFFLE` is the unconditionally-valid hash partition 
(full cross-backend redistribution). `LOCAL_EXECUTION_HASH_SHUFFLE` only 
rebalances within a backend, so it's correct *only if* each key's rows are 
already confined to one backend — it carries a precondition. For the generic 
`RequireHash` ("any hash"), the canonical/safe representative should be the 
precondition-free one (GLOBAL). `resolveExchangeType()` then specializes 
RequireHash→LOCAL *only* in the FE-planned intra-fragment context where that 
precondition holds and GLOBAL's `shuffle_idx_to_instance_idx` could be empty.
   
   Making `preferType()` return LOCAL would make the abstract default the 
precondition-bearing type — a latent footgun if any future caller (or 
`noopTo()`) ever used `preferType()` as a real inserted type. I'll add a 
comment near `resolveExchangeType()`/`RequireHash` documenting that the LOCAL 
specialization is intentional and scoped, rather than changing the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to