morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3372721760


##########
fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java:
##########
@@ -193,4 +198,52 @@ public int getNumInstances() {
     public boolean isBucketShuffle() {
         return distributionMode.equals(DistributionMode.BUCKET_SHUFFLE);
     }
+
+    public boolean isColocate() {
+        return isColocate;
+    }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+        LocalExchangeTypeRequire requireChild;
+        LocalExchangeType outputType;
+        PlanNode firstChild = children.isEmpty() ? null : children.get(0);
+        if (this instanceof UnionNode) {
+            // Propagate parent's hash requirement to children ONLY when a 
downstream operator
+            // requires shuffle for correctness (not just performance 
optimization). Matches BE's

Review Comment:
   **🟑 Minor**: When `children.isEmpty()` (constant-output union), `firstChild` 
is `null`. It's then passed to `resolveExchangeType()` β€” safe today because 
`child` is unused, but fragile if that method ever starts using the parameter. 
Consider adding a guard or a comment noting that `resolveExchangeType` does not 
dereference `child`.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java:
##########
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+
+/**
+ * FE-side local exchange planner β€” inserts {@link LocalExchangeNode} into 
each fragment's
+ * plan tree so that within-fragment data redistribution is decided at 
planning time
+ * instead of at BE pipeline-build time.
+ *
+ * <h3>When this runs</h3>
+ * Invoked from {@code NereidsPlanner.addLocalExchangeAfterDistribute()} right 
after
+ * {@code DistributePlanner} has assigned instances to fragments and before 
the plan is
+ * serialized to BE.  Gated by session variable {@code 
enable_local_shuffle_planner}
+ * (default true) and {@code enable_local_shuffle}; when either is off this 
pass is
+ * skipped entirely and BE falls back to its own {@code _plan_local_exchange}. 
 The two
+ * paths are mutually exclusive: BE consults {@code 
runtime_state.h::plan_local_shuffle()}
+ * to know whether it should plan LE itself.
+ *
+ * <h3>What it changes</h3>
+ * <ul>
+ *   <li>For each fragment with {@code maxPerBeInstances > 1}, walks the plan 
tree
+ *       bottom-up via {@link PlanNode#enforceAndDeriveLocalExchange} and 
inserts
+ *       LocalExchangeNodes where children's output distribution doesn't 
satisfy the
+ *       parent's requirement.</li>
+ *   <li>May wrap the fragment root with an extra PASSTHROUGH LE so the data 
sink
+ *       (DataStreamSink / OlapTableSink) runs with the full instance count 
even when
+ *       the root operator is serial β€” see {@link 
#addLocalExchangeForFragment}.</li>
+ *   <li>Does NOT modify the fragment sink itself, fragment boundaries, or 
instance
+ *       assignment.</li>
+ * </ul>
+ *
+ * <h3>Per-BE instance semantics</h3>
+ * Skips fragments where every BE has at most 1 instance.  Using a global 
instance count
+ * would insert LE for "2 BEs Γ— 1 instance" cases, which BE's own
+ * {@code _plan_local_exchange} would not β€” leading to pipeline task-count 
mismatch and
+ * deadlock.  See {@link #addLocalExchange}.
+ *
+ * <h3>Reading order</h3>
+ * Start with {@link PlanNode#enforceRequire} (the recursion engine), then 
individual
+ * {@code enforceAndDeriveLocalExchange} overrides on PlanNode subclasses.
+ */
+public class AddLocalExchange {
+    /** addLocalExchange with distributed plans, skipping single-instance 
fragments.
+     *  BE's _plan_local_exchange checks _num_instances which is the per-BE 
instance count.
+     *  With _num_instances<=1 all pipelines on that BE have 1 task so local 
exchange is a no-op.
+     *  We must use the same per-BE semantics: skip when every BE has at most 
1 instance.
+     *  Using global instanceCount would insert LE for fragments where 2 BEs 
each have 1 instance
+     *  (global=2, per-BE=1), causing pipeline task mismatch and deadlock. */
+    public void addLocalExchange(FragmentIdMapping<DistributedPlan> 
distributedPlans,
+            PlanTranslatorContext context) {
+        for (DistributedPlan plan : distributedPlans.values()) {
+            PipelineDistributedPlan pipePlan = (PipelineDistributedPlan) plan;
+            long maxPerBeInstances = pipePlan.getInstanceJobs().stream()
+                    .collect(java.util.stream.Collectors.groupingBy(
+                            j -> j.getAssignedWorker().id(), 
java.util.stream.Collectors.counting()))
+                    
.values().stream().mapToLong(Long::longValue).max().orElse(0);
+            if (maxPerBeInstances <= 1) {
+                continue;
+            }
+            PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
+            addLocalExchangeForFragment(fragment, context);
+        }
+    }
+
+    private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context) {
+        DataSink sink = fragment.getSink();
+        LocalExchangeTypeRequire require = sink == null
+                ? LocalExchangeTypeRequire.noRequire() : 
sink.getLocalExchangeTypeRequire();
+        PlanNode root = fragment.getPlanRoot();
+        context.setHasSerialAncestorInPipeline(root, false);
+        Pair<PlanNode, LocalExchangeType> output = root
+                .enforceAndDeriveLocalExchange(context, null, require);
+        PlanNode newRoot = output.first;
+        // The fragment data sink (DataStreamSink, OlapTableSink) runs in the 
same pipeline
+        // as the root. If the root will be serial on BE, the sink pipeline 
has 1 task β€”
+        // only instance 0 sends data, others hang or miss writes.
+        // Insert PASSTHROUGH fan-out so sink runs with _num_instances tasks.
+        // This matches BE-native's default required_data_distribution():
+        //   _child->is_serial_operator() ? PASSTHROUGH : NOOP
+        if (newRoot.isSerialOperatorOnBe(context.getConnectContext())) {
+            newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot,
+                    LocalExchangeType.PASSTHROUGH, null);
+        }
+        if (newRoot != root) {
+            fragment.setPlanRoot(newRoot);
+        }
+        validateNoSerialWithoutLocalExchange(fragment.getPlanRoot(), 
context.getConnectContext());
+    }
+
+    /**
+     * In a local-shuffle fragment, the root check above guarantees the root 
pipeline
+     * has N tasks. Any serial operator reduces its pipeline to 1 task. If 
this serial
+     * operator feeds into a non-serial parent without LocalExchangeNode in 
between,
+     * some pipelines have 1 task while others have N β†’ shared_state mismatch, 
data loss.
+     *
+     * Serial→serial chains are fine (all at 1 task, consistent). Only the 
transition
+     * from serial to non-serial needs LE to restore parallelism.
+     */
+    private void validateNoSerialWithoutLocalExchange(PlanNode node,
+            org.apache.doris.qe.ConnectContext context) {
+        for (PlanNode child : node.getChildren()) {
+            validateNoSerialWithoutLocalExchange(child, context);
+            if (child.isSerialOperatorOnBe(context)
+                    && !(child instanceof LocalExchangeNode)
+                    && !(node instanceof LocalExchangeNode)
+                    && !(node instanceof ExchangeNode)
+                    && !node.isSerialOperatorOnBe(context)) {
+                
org.apache.logging.log4j.LogManager.getLogger(AddLocalExchange.class).warn(
+                        "Serial " + child.getClass().getSimpleName() + "(id=" 
+ child.getId()
+                        + ") feeds into non-serial " + 
node.getClass().getSimpleName()
+                        + "(id=" + node.getId() + ") without LocalExchangeNode"
+                        + " in fragment " + node.getFragment().getFragmentId()
+                        + ". FE should insert LocalExchangeNode to restore 
parallelism.");
+            }
+        }
+    }
+
+    public static boolean isColocated(PlanNode plan) {
+        if (plan instanceof AggregationNode) {
+            return ((AggregationNode) plan).isColocate() && 
isColocated(plan.getChild(0));
+        } else if (plan instanceof OlapScanNode) {
+            return true;
+        } else if (plan instanceof SelectNode) {
+            return isColocated(plan.getChild(0));
+        } else if (plan instanceof HashJoinNode) {
+            return ((HashJoinNode) plan).isColocate()
+                    && (isColocated(plan.getChild(0)) || 
isColocated(plan.getChild(1)));
+        } else if (plan instanceof SetOperationNode) {
+            if (!((SetOperationNode) plan).isColocate()) {
+                return false;
+            }
+            for (PlanNode child : plan.getChildren()) {
+                if (isColocated(child)) {
+                    return true;
+                }

Review Comment:
   **🟑 Minor**: `resolveExchangeType()` accepts a `child` parameter that is 
never used. This is confusing at call sites β€” e.g., `SetOperationNode` passes a 
potentially-null `firstChild` (when `children.isEmpty()`, line 214). It's safe 
today because `child` is unused, but if someone later adds code that accesses 
`child`, it could NPE on empty-children Union nodes. Consider removing the 
unused parameter.



##########
be/src/exec/pipeline/pipeline_fragment_context.h:
##########
@@ -320,8 +322,11 @@ class PipelineFragmentContext : public 
TaskExecutionContext {
 
     std::mutex _state_map_lock;
 
-    int _operator_id = 0;
-    int _sink_operator_id = 0;
+    // Start from -1 so all operator IDs are negative. This avoids collision 
with

Review Comment:
   **🟒 Note**: `_operator_id` and `_sink_operator_id` now start from `-1` 
instead of `0` to avoid collision with unpaired sinks (e.g., `OlapTableSink`) 
that hardcode `dest_id=0`. This is a clean fix. Verify that no code path in the 
BE enumerates or displays operator IDs expecting them to be non-negative β€” 
negative IDs in logs or profiles could confuse debugging.



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,199 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
     return Status::OK();
 }
 
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+    for (auto& info : _deferred_exchangers) {
+        // DANGER ZONE β€” do not "fix" this line without reading the history.
+        //
+        // sender_count seeds Exchanger::_running_sink_operators, which the 
source side
+        // waits to reach 0 via sub_running_sink_operators on each sink 
LocalState close.
+        // The correct value is THIS pipeline-instance's sink task count, 
which is exactly
+        // info.upstream_pipe->num_tasks() β€” one PipelineTask per task, one 
close per task.
+        //
+        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to 
mirror the
+        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS 
BREAKS the
+        //   common FE-planned shape of `serial scan β†’ LE(PT) β†’ ...`: 
upstream_pipe
+        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+        //   _num_instances so _running_sink_operators never reaches 0 β€” 
downstream
+        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build 
949402
+        //   regressed exactly this way).  BE-planned mode uses max() because 
its
+        //   `cur_pipe` is the source-side pipeline (always raised to 
_num_instances by
+        //   add_pipeline) β€” not analogous to our `upstream_pipe` here, which 
is the
+        //   sink-side pipeline that may legitimately stay at 1 for serial 
sources.
+        //
+        // Tempting wrong fix #2: multiply by _num_instances on the theory 
shared_state
+        //   is shared across all instances.  Same hang β€” each 
fragment-instance
+        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, 
so the
+        //   exchanger is per-instance, not per-BE.  num_tasks() is already 
the right
+        //   close-count for one instance.
+        //
+        // If a hang shows up with `_running_sink_operators < 0`, the bug is 
upstream:
+        // _propagate_local_exchange_num_tasks left num_tasks too low (or too 
high) for
+        // this fragment shape.  Fix THAT pass, not this seed value.
+        const int sender_count = info.upstream_pipe->num_tasks();

Review Comment:
   **🟠 Medium**: The two-pass `while (changed)` loops in 
`_propagate_local_exchange_num_tasks()` rely on monotonic convergence 
(num_tasks only increases in Pass 1, only decreases in Pass 2). If a future 
code change introduces a non-monotonic update, this could loop infinitely. 
Consider adding a max-iteration guard (`while (changed && ++iter_count < 
MAX_ITERATIONS)`) with a `DCHECK` or `LOG(WARNING)` on exceeded iterations, so 
a future regression fails loudly rather than hanging.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java:
##########
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+
+/**
+ * FE-side local exchange planner β€” inserts {@link LocalExchangeNode} into 
each fragment's
+ * plan tree so that within-fragment data redistribution is decided at 
planning time
+ * instead of at BE pipeline-build time.
+ *
+ * <h3>When this runs</h3>
+ * Invoked from {@code NereidsPlanner.addLocalExchangeAfterDistribute()} right 
after
+ * {@code DistributePlanner} has assigned instances to fragments and before 
the plan is
+ * serialized to BE.  Gated by session variable {@code 
enable_local_shuffle_planner}
+ * (default true) and {@code enable_local_shuffle}; when either is off this 
pass is
+ * skipped entirely and BE falls back to its own {@code _plan_local_exchange}. 
 The two
+ * paths are mutually exclusive: BE consults {@code 
runtime_state.h::plan_local_shuffle()}
+ * to know whether it should plan LE itself.
+ *
+ * <h3>What it changes</h3>
+ * <ul>
+ *   <li>For each fragment with {@code maxPerBeInstances > 1}, walks the plan 
tree
+ *       bottom-up via {@link PlanNode#enforceAndDeriveLocalExchange} and 
inserts
+ *       LocalExchangeNodes where children's output distribution doesn't 
satisfy the
+ *       parent's requirement.</li>
+ *   <li>May wrap the fragment root with an extra PASSTHROUGH LE so the data 
sink
+ *       (DataStreamSink / OlapTableSink) runs with the full instance count 
even when
+ *       the root operator is serial β€” see {@link 
#addLocalExchangeForFragment}.</li>
+ *   <li>Does NOT modify the fragment sink itself, fragment boundaries, or 
instance
+ *       assignment.</li>
+ * </ul>
+ *
+ * <h3>Per-BE instance semantics</h3>
+ * Skips fragments where every BE has at most 1 instance.  Using a global 
instance count
+ * would insert LE for "2 BEs Γ— 1 instance" cases, which BE's own
+ * {@code _plan_local_exchange} would not β€” leading to pipeline task-count 
mismatch and
+ * deadlock.  See {@link #addLocalExchange}.
+ *
+ * <h3>Reading order</h3>
+ * Start with {@link PlanNode#enforceRequire} (the recursion engine), then 
individual
+ * {@code enforceAndDeriveLocalExchange} overrides on PlanNode subclasses.
+ */
+public class AddLocalExchange {
+    /** addLocalExchange with distributed plans, skipping single-instance 
fragments.
+     *  BE's _plan_local_exchange checks _num_instances which is the per-BE 
instance count.
+     *  With _num_instances<=1 all pipelines on that BE have 1 task so local 
exchange is a no-op.
+     *  We must use the same per-BE semantics: skip when every BE has at most 
1 instance.
+     *  Using global instanceCount would insert LE for fragments where 2 BEs 
each have 1 instance
+     *  (global=2, per-BE=1), causing pipeline task mismatch and deadlock. */
+    public void addLocalExchange(FragmentIdMapping<DistributedPlan> 
distributedPlans,
+            PlanTranslatorContext context) {
+        for (DistributedPlan plan : distributedPlans.values()) {
+            PipelineDistributedPlan pipePlan = (PipelineDistributedPlan) plan;
+            long maxPerBeInstances = pipePlan.getInstanceJobs().stream()
+                    .collect(java.util.stream.Collectors.groupingBy(
+                            j -> j.getAssignedWorker().id(), 
java.util.stream.Collectors.counting()))
+                    
.values().stream().mapToLong(Long::longValue).max().orElse(0);
+            if (maxPerBeInstances <= 1) {
+                continue;
+            }
+            PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
+            addLocalExchangeForFragment(fragment, context);
+        }
+    }
+
+    private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context) {
+        DataSink sink = fragment.getSink();
+        LocalExchangeTypeRequire require = sink == null
+                ? LocalExchangeTypeRequire.noRequire() : 
sink.getLocalExchangeTypeRequire();
+        PlanNode root = fragment.getPlanRoot();
+        context.setHasSerialAncestorInPipeline(root, false);
+        Pair<PlanNode, LocalExchangeType> output = root
+                .enforceAndDeriveLocalExchange(context, null, require);
+        PlanNode newRoot = output.first;
+        // The fragment data sink (DataStreamSink, OlapTableSink) runs in the 
same pipeline
+        // as the root. If the root will be serial on BE, the sink pipeline 
has 1 task β€”
+        // only instance 0 sends data, others hang or miss writes.
+        // Insert PASSTHROUGH fan-out so sink runs with _num_instances tasks.
+        // This matches BE-native's default required_data_distribution():
+        //   _child->is_serial_operator() ? PASSTHROUGH : NOOP
+        if (newRoot.isSerialOperatorOnBe(context.getConnectContext())) {
+            newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot,
+                    LocalExchangeType.PASSTHROUGH, null);
+        }
+        if (newRoot != root) {
+            fragment.setPlanRoot(newRoot);
+        }
+        validateNoSerialWithoutLocalExchange(fragment.getPlanRoot(), 
context.getConnectContext());
+    }
+
+    /**
+     * In a local-shuffle fragment, the root check above guarantees the root 
pipeline
+     * has N tasks. Any serial operator reduces its pipeline to 1 task. If 
this serial
+     * operator feeds into a non-serial parent without LocalExchangeNode in 
between,
+     * some pipelines have 1 task while others have N β†’ shared_state mismatch, 
data loss.
+     *
+     * Serial→serial chains are fine (all at 1 task, consistent). Only the 
transition
+     * from serial to non-serial needs LE to restore parallelism.
+     */
+    private void validateNoSerialWithoutLocalExchange(PlanNode node,
+            org.apache.doris.qe.ConnectContext context) {
+        for (PlanNode child : node.getChildren()) {
+            validateNoSerialWithoutLocalExchange(child, context);
+            if (child.isSerialOperatorOnBe(context)
+                    && !(child instanceof LocalExchangeNode)

Review Comment:
   **🟑 Observation**: `validateNoSerialWithoutLocalExchange()` only logs a 
warning when the invariant is violated β€” it does not throw. If a 
serial→non-serial transition occurs without an LE in between, queries could 
produce wrong results silently. Consider: (1) in debug/test builds, throw or 
DCHECK; (2) add a metric counter so operators can monitor this in production 
before it causes user-visible issues.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java:
##########
@@ -258,4 +268,155 @@ public boolean isQueryCacheCandidate() {
     public void setQueryCacheCandidate(boolean queryCacheCandidate) {
         this.queryCacheCandidate = queryCacheCandidate;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+
+        ConnectContext connectContext = translatorContext.getConnectContext();
+        SessionVariable sessionVariable = connectContext.getSessionVariable();
+        // PR #62438: when false, non-finalize agg falls back to BE base class.
+        boolean enableLeBeforeAgg = 
sessionVariable.enableLocalExchangeBeforeAgg;
+        boolean hasKeys = !aggInfo.getGroupingExprs().isEmpty();

Review Comment:
   **βœ… Correctness fix**: The `!needsFinalize && !aggInfo.isMerge() && 
!enableLeBeforeAgg` guard correctly separates FIRST_MERGE 
(correctness-required, always HASH) from LOCAL phase (performance-only, flag 
controls). This diverges from BE's `!_needs_finalize && 
!enable_local_exchange_before_agg` early-return which conflates both intents β€” 
the FE planner is actually *more correct* here than BE. For the backward-compat 
path (legacy BE planner), the bug DORIS-25413 remains; consider adding a 
comment noting this divergence explicitly.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -285,4 +302,71 @@ public List<Expr> getOtherJoinConjuncts() {
     public List<Expr> getMarkJoinConjuncts() {
         return markJoinConjuncts;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+
+        LocalExchangeTypeRequire probeSideRequire;
+        LocalExchangeTypeRequire buildSideRequire;
+        LocalExchangeType outputType = null;
+
+        if (joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+            buildSideRequire = probeSideRequire = 
LocalExchangeTypeRequire.noRequire();
+            outputType = LocalExchangeType.NOOP;
+        } else if (distrMode == DistributionMode.BROADCAST) {
+            // BE: _child->is_serial_operator() ? PASSTHROUGH/PASS_TO_ONE : 
NOOP
+            boolean probeChildSerial = children.get(0).isSerialOperatorOnBe(
+                    translatorContext.getConnectContext());
+            boolean buildChildSerial = children.get(1).isSerialOperatorOnBe(
+                    translatorContext.getConnectContext());
+            probeSideRequire = probeChildSerial
+                    ? LocalExchangeTypeRequire.requirePassthrough()
+                    : LocalExchangeTypeRequire.noRequire();
+            buildSideRequire = buildChildSerial
+                    ? LocalExchangeTypeRequire.requirePassToOne()
+                    : LocalExchangeTypeRequire.noRequire();
+            // For serial probe: output is PASSTHROUGH (data from single 
instance).
+            // For non-serial probe: propagate probe side's actual 
distribution.
+            outputType = probeChildSerial ? LocalExchangeType.PASSTHROUGH : 
null;
+        } else if (isColocate() || isBucketShuffle()) {
+            probeSideRequire = LocalExchangeTypeRequire.requireBucketHash();
+            // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
+            // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has 
no shared
+            // hash table mechanism β€” PASS_TO_ONE routes all data to task 0 
while tasks 1..N-1
+            // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE 
correctly distributes
+            // build data by bucket to match the probe side's bucket 
distribution.
+            // The serial exchange returns NOOP, so enforceRequire() will 
insert a
+            // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out 
for heavy-ops

Review Comment:
   **🟑 Clarification**: For Colocate/BucketShuffle join, both probe and build 
sides use `requireBucketHash()`. For a serial build child (e.g., BucketShuffle 
join where the build Exchange is serial), the framework will insert 
`PASSTHROUGH β†’ BUCKET_HASH_SHUFFLE` chain via the heavy-ops gate. This is 
correct but the comment only mentions the probe side β€” consider adding a brief 
note about the build side's serial handling too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to