This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 212f2fb6570730813d0c88098e5010394e4b2348
Author: 924060929 <[email protected]>
AuthorDate: Tue Mar 31 14:33:33 2026 +0800

    [fix](local shuffle) align FE local exchange planning with BE pipeline model
    
    Key changes to match BE's _plan_local_exchange behavior:
    
    1. AddLocalExchange + PlanTranslatorContext: track isLocalShuffleFragment
       flag (true only for LocalShuffleAssignedJob / pooling-scan fragments).
       Skip single-instance fragments entirely — BE's _plan_local_exchange
       processes all pipelines but with num_instances=1 local exchange is a
       no-op; skipping avoids mismatched sender/receiver counts.
       Insert PASSTHROUGH at fragment root when isLocalShuffle &&
       newRoot.isSerialOperator() — mirrors BE base-class
       required_data_distribution() returning PASSTHROUGH for serial children
       so all _num_instances sink tasks run and send EOS.
    
    2. PlanNode enforceChild / enforceChildExchange / forceEnforceChildExchange:
       heavy_ops bottleneck avoidance — when child is serial and exchange is
       heavy (HASH/BUCKET_HASH/ADAPTIVE_PASSTHROUGH), insert PASSTHROUGH
       fan-out first.  Only for pooling-scan (isLocalShuffleFragment) where
       serial means exactly 1 pipeline task.
    
    3. SortNode: check isAnalyticSort first (mirrors BE _is_analytic_sort flag);
       remove dead fragment.useSerialSource() + instanceof ScanNode branch that
       has been superseded by the AddLocalExchange PASSTHROUGH-at-root logic.
    
    4. ExchangeNode: serial Exchange (UNPARTITIONED or use_serial_exchange=true)
       returns NOOP — mirrors BE 
ExchangeSourceOperatorX::required_data_distribution().
    
    5. ThriftPlansBuilder: simplify computeExchangeSenderNum — remove
       senderFragmentOutputsSerially logic that tried to adjust sender count for
       serial output pipelines.  With the PASSTHROUGH-at-root fix, all fragment
       instances always create sink tasks, so sender count = 
instanceJobs.size().
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp |  2 -
 .../glue/translator/PlanTranslatorContext.java     | 15 ++++++++
 .../org/apache/doris/planner/AddLocalExchange.java | 37 ++++++++++++++----
 .../org/apache/doris/planner/ExchangeNode.java     | 10 ++++-
 .../java/org/apache/doris/planner/PlanNode.java    | 44 ++++++++++++++++++----
 .../java/org/apache/doris/planner/SortNode.java    | 17 +++------
 .../doris/qe/runtime/ThriftPlansBuilder.java       | 41 ++------------------
 7 files changed, 100 insertions(+), 66 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 9a4d14494ea..abfe3048fff 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -687,8 +687,6 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
 
 Status PipelineFragmentContext::_create_deferred_local_exchangers() {
     for (auto& info : _deferred_exchangers) {
-        // num_tasks raise is handled globally in 
_build_and_prepare_full_pipeline
-        // after this function returns.  No per-exchanger adjustment needed 
here.
         const int sender_count = info.upstream_pipe->num_tasks();
         switch (info.partition_type) {
         case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index afd1a8d26d3..43088d071ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -118,6 +118,13 @@ public class PlanTranslatorContext {
 
     private final Map<PlanNodeId, Boolean> serialAncestorInPipelineMap = 
Maps.newHashMap();
 
+    // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan 
with
+    // ignoreDataDistribution → _parallel_instances=1 in BE). When true, 
serial operators
+    // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out 
(heavy_ops).
+    // When false (regular bucket distribution), each instance has its own 
scan range
+    // and num_tasks matches _num_instances, so no fan-out is needed.
+    private boolean isLocalShuffleFragment = false;
+
     private boolean isTopMaterializeNode = true;
 
     private final Set<SlotId> virtualColumnIds = Sets.newHashSet();
@@ -255,6 +262,14 @@ public class PlanTranslatorContext {
         return serialAncestorInPipelineMap.getOrDefault(node.getId(), false);
     }
 
+    public void setIsLocalShuffleFragment(boolean isLocalShuffle) {
+        this.isLocalShuffleFragment = isLocalShuffle;
+    }
+
+    public boolean isLocalShuffleFragment() {
+        return isLocalShuffleFragment;
+    }
+
     public SlotDescriptor addSlotDesc(TupleDescriptor t) {
         return descTable.addSlotDescriptor(t);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
index d982ab80385..120125ff008 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -32,35 +32,58 @@ import java.util.List;
 public class AddLocalExchange {
     public void addLocalExchange(List<PlanFragment> fragments, 
PlanTranslatorContext context) {
         for (PlanFragment fragment : fragments) {
-            addLocalExchangeForFragment(fragment, context);
+            addLocalExchangeForFragment(fragment, context, false);
         }
     }
 
-    /** addLocalExchange with distributed plans, skipping single-instance 
fragments (matching BE behavior) */
+    /** addLocalExchange with distributed plans, skipping single-instance 
fragments.
+     *  BE's _plan_local_exchange processes all pipelines regardless of 
_num_instances,
+     *  but with _num_instances==1 all pipelines have 1 task so local exchange 
is a no-op.
+     *  Skipping avoids inserting LOCAL_EXCHANGE_NODEs that change pipeline 
structure
+     *  without benefit and may cause sender/receiver count mismatches. */
     public void addLocalExchange(FragmentIdMapping<DistributedPlan> 
distributedPlans,
             PlanTranslatorContext context) {
         for (DistributedPlan plan : distributedPlans.values()) {
             PipelineDistributedPlan pipePlan = (PipelineDistributedPlan) plan;
             int instanceCount = pipePlan.getInstanceJobs().size();
-            // Match BE's early-return: if (_num_instances <= 1) return OK();
             if (instanceCount <= 1) {
                 continue;
             }
             PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
-            addLocalExchangeForFragment(fragment, context);
+            boolean isLocalShuffle = pipePlan.getInstanceJobs().stream()
+                    .anyMatch(j -> j
+                            instanceof 
org.apache.doris.nereids.trees.plans.distribute
+                                    .worker.job.LocalShuffleAssignedJob);
+            addLocalExchangeForFragment(fragment, context, isLocalShuffle);
         }
     }
 
-    private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context) {
+    /** @return true if a LOCAL_EXCHANGE_NODE was inserted in this fragment */
+    private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context,
+            boolean isLocalShuffle) {
         DataSink sink = fragment.getSink();
         LocalExchangeTypeRequire require = sink == null
                 ? LocalExchangeTypeRequire.noRequire() : 
sink.getLocalExchangeTypeRequire();
         PlanNode root = fragment.getPlanRoot();
         context.setHasSerialAncestorInPipeline(root, false);
+        context.setIsLocalShuffleFragment(isLocalShuffle);
         Pair<PlanNode, LocalExchangeType> output = root
                 .enforceAndDeriveLocalExchange(context, null, require);
-        if (output.first != root) {
-            fragment.setPlanRoot(output.first);
+        PlanNode newRoot = output.first;
+        // Mirror BE OperatorBase base class required_data_distribution():
+        // when child (fragment root) is serial AND the fragment uses pooling 
scan
+        // (LocalShuffleAssignedJob → ignoreDataDistribution → 
_parallel_instances=1),
+        // the DataStreamSink pipeline has num_tasks=1 so only instance 0 
creates a
+        // task and sends EOS. Downstream receivers expect _num_instances 
EOSes → hang.
+        // Insert PASSTHROUGH fan-out to create _num_instances sink tasks.
+        // Non-pooling fragments (regular bucket distribution) have 
_num_instances ==
+        // bucket_count and every instance gets a scan range, so all sink 
tasks run.
+        if (isLocalShuffle && newRoot.isSerialOperator()) {
+            newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot,
+                    LocalExchangeType.PASSTHROUGH, null);
+        }
+        if (newRoot != root) {
+            fragment.setPlanRoot(newRoot);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 165b4d3756e..508788717c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -180,7 +180,15 @@ public class ExchangeNode extends PlanNode {
     @Override
     public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
             PlanNode parent, LocalExchangeTypeRequire parentRequire) {
-        if (partitionType == TPartitionType.HASH_PARTITIONED) {
+        // Mirror BE's ExchangeSourceOperatorX::required_data_distribution():
+        // serial Exchange returns NOOP. This covers both:
+        // 1. UNPARTITIONED exchange (naturally serial) → already NOOP in else 
branch
+        // 2. HASH/BUCKET exchange made serial by use_serial_exchange=true → 
override to NOOP
+        // Mirror BE ExchangeSourceOperatorX::required_data_distribution():
+        // serial → NOOP; else HASH/BUCKET/NOOP based on partition type.
+        if (isSerialOperator()) {
+            return Pair.of(this, LocalExchangeType.NOOP);
+        } else if (partitionType == TPartitionType.HASH_PARTITIONED) {
             return Pair.of(this, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
         } else if (partitionType == 
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
             return Pair.of(this, LocalExchangeType.BUCKET_HASH_SHUFFLE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 1bce63f1416..805d419b86b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -984,11 +984,15 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
{
                 return childOutput;
             }
             List<Expr> distributeExprs = childIndex >= 0 ? 
getChildDistributeExprList(childIndex) : null;
-            // Heavy ops bottleneck avoidance (mirrors BE 
pipeline_fragment_context.cpp:1013-1025):
-            // When upstream has 1 task (serial/pooling scan) and exchange is 
heavy (hash shuffle,
-            // bucket hash, adaptive passthrough), insert PASSTHROUGH fan-out 
first to avoid
-            // single-task bottleneck on the heavy exchange sink.
-            if (preferType.isHeavyOperation() && 
childOutput.first.isSerialOperator()) {
+            // Heavy ops bottleneck avoidance (mirrors BE 
pipeline_fragment_context.cpp:1025-1038):
+            // Heavy ops bottleneck avoidance (mirrors BE 
pipeline_fragment_context.cpp:1025-1038):
+            // When upstream has 1 task (serial source) and exchange is heavy, 
insert PASSTHROUGH
+            // fan-out first to avoid single-task bottleneck on the heavy 
exchange sink.
+            // Only applies to local-shuffle fragments (pooling scan) where 
_parallel_instances=1
+            // causes serial pipelines to have 1 task. In non-pooling 
fragments, each instance
+            // has its own scan range and all pipelines have _num_instances 
tasks.
+            if (translatorContext.isLocalShuffleFragment()
+                    && preferType.isHeavyOperation() && 
childOutput.first.isSerialOperator()) {
                 PlanNode ptNode = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(),
                         childOutput.first, LocalExchangeType.PASSTHROUGH, 
null);
                 return Pair.of(
@@ -1035,9 +1039,22 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> {
         if (!require.satisfy(childOutput.second)) {
             LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
                     require, translatorContext, this, childOutput.first);
+            List<Expr> distributeExprs = 
getChildDistributeExprList(childIndex);
+            // Heavy ops bottleneck avoidance (same as enforceChild):
+            // serial child + heavy exchange → insert PASSTHROUGH fan-out first
+            // Only for local-shuffle (pooling scan) fragments where serial 
means 1 task.
+            if (translatorContext.isLocalShuffleFragment()
+                    && preferType.isHeavyOperation() && 
childOutput.first.isSerialOperator()) {
+                PlanNode ptNode = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(),
+                        childOutput.first, LocalExchangeType.PASSTHROUGH, 
null);
+                return Pair.of(
+                        new 
LocalExchangeNode(translatorContext.nextPlanNodeId(), ptNode,
+                                preferType, distributeExprs),
+                        childOutput.second);
+            }
             return Pair.of(
                     new LocalExchangeNode(translatorContext.nextPlanNodeId(), 
childOutput.first,
-                            preferType, 
getChildDistributeExprList(childIndex)),
+                            preferType, distributeExprs),
                     childOutput.second);
         }
         return childOutput;
@@ -1059,9 +1076,22 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> {
         if (require.preferType() != LocalExchangeType.NOOP) {
             LocalExchangeType preferType = 
AddLocalExchange.resolveExchangeType(
                     require, translatorContext, this, childOutput.first);
+            List<Expr> distributeExprs = 
getChildDistributeExprList(childIndex);
+            // Heavy ops bottleneck avoidance (same as enforceChild):
+            // serial child + heavy exchange → insert PASSTHROUGH fan-out first
+            // Only for local-shuffle (pooling scan) fragments where serial 
means 1 task.
+            if (translatorContext.isLocalShuffleFragment()
+                    && preferType.isHeavyOperation() && 
childOutput.first.isSerialOperator()) {
+                PlanNode ptNode = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(),
+                        childOutput.first, LocalExchangeType.PASSTHROUGH, 
null);
+                return Pair.of(
+                        new 
LocalExchangeNode(translatorContext.nextPlanNodeId(), ptNode,
+                                preferType, distributeExprs),
+                        childOutput.second);
+            }
             return Pair.of(
                     new LocalExchangeNode(translatorContext.nextPlanNodeId(), 
childOutput.first,
-                            preferType, 
getChildDistributeExprList(childIndex)),
+                            preferType, distributeExprs),
                     childOutput.second);
         }
         return childOutput;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 78f55ca165c..f7beb670c01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -255,13 +255,11 @@ public class SortNode extends PlanNode {
 
         LocalExchangeTypeRequire requireChild;
         LocalExchangeType outputType = null;
-        if (children.get(0) instanceof AnalyticEvalNode) {
-            if (mergeByexchange) {
-                // Outer merge-sort above analytic: data is already 
sorted/partitioned by analytic,
-                // just need passthrough for merge. BE: 
SortSink._merge_by_exchange=true → PASSTHROUGH.
-                requireChild = LocalExchangeTypeRequire.requirePassthrough();
-                outputType = LocalExchangeType.PASSTHROUGH;
-            } else if (AddLocalExchange.isColocated(this)) {
+        if (isAnalyticSort) {
+            // BE: SortSink._is_analytic_sort=true → 
required_data_distribution() = HASH.
+            // This sort serves a parent AnalyticEvalNode (window function) 
and requires
+            // data partitioned by the analytic's partition exprs.
+            if (AddLocalExchange.isColocated(this)) {
                 requireChild = LocalExchangeTypeRequire.requireHash();
                 outputType = AddLocalExchange.resolveExchangeType(
                         LocalExchangeTypeRequire.requireHash(), 
translatorContext, this,
@@ -273,11 +271,8 @@ public class SortNode extends PlanNode {
             // BE: SortSink._merge_by_exchange=true → 
required_data_distribution() = PASSTHROUGH.
             requireChild = LocalExchangeTypeRequire.requirePassthrough();
             outputType = LocalExchangeType.PASSTHROUGH;
-        } else if 
(fragment.useSerialSource(translatorContext.getConnectContext())
-                && children.get(0) instanceof ScanNode) {
-            requireChild = LocalExchangeTypeRequire.requirePassthrough();
-            outputType = LocalExchangeType.PASSTHROUGH;
         } else {
+            // BE: else → NOOP
             requireChild = LocalExchangeTypeRequire.noRequire();
             outputType = LocalExchangeType.NOOP;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index ad1000c23da..2dfafd2ff6b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -126,8 +126,7 @@ public class ThriftPlansBuilder {
         for (PipelineDistributedPlan currentFragmentPlan : distributedPlans) {
             
sharedFileScanRangeParams.putAll(computeFileScanRangeParams(currentFragmentPlan));
 
-            Map<Integer, Integer> exchangeSenderNum = computeExchangeSenderNum(
-                    currentFragmentPlan, coordinatorContext.connectContext);
+            Map<Integer, Integer> exchangeSenderNum = 
computeExchangeSenderNum(currentFragmentPlan);
             ListMultimap<DistributedPlanWorker, AssignedJob> instancesPerWorker
                     = groupInstancePerWorker(currentFragmentPlan);
             Map<DistributedPlanWorker, TPipelineFragmentParams> 
workerToCurrentFragment = Maps.newLinkedHashMap();
@@ -259,50 +258,16 @@ public class ThriftPlansBuilder {
     }
 
     private static Map<Integer, Integer> computeExchangeSenderNum(
-            PipelineDistributedPlan distributedPlan, ConnectContext 
connectContext) {
+            PipelineDistributedPlan distributedPlan) {
         Map<Integer, Integer> senderNum = Maps.newLinkedHashMap();
         for (Entry<ExchangeNode, DistributedPlan> kv : 
distributedPlan.getInputs().entries()) {
             ExchangeNode exchangeNode = kv.getKey();
             PipelineDistributedPlan childPlan = (PipelineDistributedPlan) 
kv.getValue();
-            List<AssignedJob> childInstances = childPlan.getInstanceJobs();
-
-            // The receiver's stream_recvr num_senders must match the number 
of tasks in the
-            // sender fragment's output pipeline.
-            //
-            // Local shuffle has two shapes:
-            // 1. serial source -> local exchange -> parallel operators -> 
DataStreamSink
-            //    In this case all fragment instances still send data, so 
sender count must keep
-            //    `childInstances.size()`.
-            // 2. serial source -> serial output pipeline -> DataStreamSink
-            //    In this case only the first instance per worker sends data, 
so sender count must
-            //    collapse to distinct workers.
-            boolean useLocalShuffle = childInstances.stream()
-                    .anyMatch(LocalShuffleAssignedJob.class::isInstance);
-            int actualSenderCount;
-            if (useLocalShuffle && senderFragmentOutputsSerially(childPlan, 
connectContext)) {
-                actualSenderCount = (int) childInstances.stream()
-                        .map(AssignedJob::getAssignedWorker)
-                        .distinct()
-                        .count();
-            } else {
-                actualSenderCount = childInstances.size();
-            }
-
-            senderNum.merge(exchangeNode.getId().asInt(), actualSenderCount, 
Integer::sum);
+            senderNum.merge(exchangeNode.getId().asInt(), 
childPlan.getInstanceJobs().size(), Integer::sum);
         }
         return senderNum;
     }
 
-    private static boolean senderFragmentOutputsSerially(
-            PipelineDistributedPlan childPlan, ConnectContext connectContext) {
-        PlanFragment fragment = childPlan.getFragmentJob().getFragment();
-        PlanNode planRoot = fragment.getPlanRoot();
-        if (!fragment.useSerialSource(connectContext)) {
-            return false;
-        }
-        return planRoot.isSerialOperator() || !planRoot.hasSerialChildren();
-    }
-
     private static void 
setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) {
         MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
fragmentPlan.getFragmentJob().getFragment().getSink();
         List<List<TPlanFragmentDestination>> destinationList = 
multiCastDataSink.getDestinations();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to