This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize_pr
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1fd8ebfea29461971c22e65b127a520fce1840fb
Author: 924060929 <[email protected]>
AuthorDate: Wed Jun 24 17:45:51 2026 +0800

    [opt](local shuffle) DORIS-24902 part1: decouple exchange serial + spread 
bucket-shuffle dests
    
    ExchangeNode serial decoupling: when the FE local-shuffle planner is 
enabled,
    ExchangeNode.isSerialOperatorOnBe no longer inherits 
fragment.hasSerialScanNode;
    serial status comes from the node itself only.
    
    Bucket-shuffle dest spreading: in pooled-scan fragments with the FE planner,
    DistributePlanner spreads destinations to all instances that own buckets 
(via
    assignedJoinBucketIndexes), instead of funneling to the first instance per 
worker.
    This eliminates the serial bottleneck diagnosed in DORIS-24902 (supplier 
domain
    exchange dests: 3 funneled → 24 spread).
    
    Serial exchange gate: dest spreading is skipped for serial exchanges where 
the
    exchange itself runs with 1 task (isSerialOperatorOnBe), preserving the 
funnel
    semantics that serial exchanges require.
    
    BE orphan instance fix: when dest spreading targets only bucket-owning 
instances,
    non-owning instances become orphans whose receivers never get data. Fixed by
    creating orphan recvrs with num_senders=0 (ready immediately at EOS) and 
using
    per-BE local task index for orphan detection (the prior per-fragment global 
index
    silently dropped entire buckets on multi-BE deployments).
---
 be/src/exec/exchange/vdata_stream_recvr.h          | 12 ++++-
 be/src/exec/operator/exchange_source_operator.cpp  | 11 ++++-
 be/src/exec/operator/exchange_source_operator.h    | 36 ++++++++++++++
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 11 ++++-
 .../trees/plans/distribute/DistributePlanner.java  | 57 +++++++++++++++++++++-
 .../org/apache/doris/planner/ExchangeNode.java     |  7 +++
 6 files changed, 128 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/exchange/vdata_stream_recvr.h 
b/be/src/exec/exchange/vdata_stream_recvr.h
index 2b395ee61f4..bde675cae9a 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.h
+++ b/be/src/exec/exchange/vdata_stream_recvr.h
@@ -198,7 +198,17 @@ public:
 
     void close();
 
-    void set_dependency(std::shared_ptr<Dependency> dependency) { 
_source_dependency = dependency; }
+    void set_dependency(std::shared_ptr<Dependency> dependency) {
+        _source_dependency = dependency;
+        // A queue created with zero senders (bucket-shuffle orphan instance, 
see
+        // ExchangeLocalState::create_stream_recvr) never goes through 
decrement_senders,
+        // so the usual reached-zero set_ready never fires — mark it ready at 
wiring time
+        // or its task blocks forever on SHUFFLE_DATA_DEPENDENCY.
+        std::lock_guard<std::mutex> l(_lock);
+        if (_num_remaining_senders == 0) {
+            set_source_ready(l);
+        }
+    }
 
 protected:
     struct BlockItem;
diff --git a/be/src/exec/operator/exchange_source_operator.cpp 
b/be/src/exec/operator/exchange_source_operator.cpp
index 1e4d24de8e8..69b27536ca8 100644
--- a/be/src/exec/operator/exchange_source_operator.cpp
+++ b/be/src/exec/operator/exchange_source_operator.cpp
@@ -64,9 +64,17 @@ std::string ExchangeSourceOperatorX::debug_string(int 
indentation_level) const {
 
 void ExchangeLocalState::create_stream_recvr(RuntimeState* state) {
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
+    int num_senders = p.num_senders();
+    if (p.is_bucket_shuffle_orphan_instance(local_task_idx)) {
+        // Bucket-routed senders open one channel per destination entry (one 
per bucket),
+        // so an instance owning no bucket never gets a channel — and never 
gets EOS.
+        // Start its receiver with zero senders so it reports EOS immediately 
instead of
+        // blocking forever (DORIS-24902 K-of-N destination spread).
+        num_senders = 0;
+    }
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, _memory_used_counter, state->fragment_instance_id(), 
p.node_id(),
-            p.num_senders(), custom_profile(), p.is_merging(),
+            num_senders, custom_profile(), p.is_merging(),
             std::max(20480, config::exchg_node_buffer_size_bytes /
                                     (p.is_merging() ? p.num_senders() : 1)));
 }
@@ -75,6 +83,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
+    local_task_idx = info.task_idx;
     create_stream_recvr(state);
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
diff --git a/be/src/exec/operator/exchange_source_operator.h 
b/be/src/exec/operator/exchange_source_operator.h
index c6f651aaffd..40a11cc8a31 100644
--- a/be/src/exec/operator/exchange_source_operator.h
+++ b/be/src/exec/operator/exchange_source_operator.h
@@ -19,6 +19,9 @@
 
 #include <stdint.h>
 
+#include <map>
+#include <set>
+
 #include "exec/operator/operator.h"
 #include "exprs/vexpr_fwd.h"
 
@@ -75,6 +78,9 @@ public:
     doris::VExprContextSPtrs ordering_expr_ctxs;
     int64_t num_rows_skipped;
     bool is_ready;
+    // per-BE local instance index (LocalStateInfo::task_idx), used for 
bucket-shuffle
+    // orphan detection in create_stream_recvr — see 
is_bucket_shuffle_orphan_instance.
+    int local_task_idx = 0;
 
     std::vector<std::shared_ptr<Dependency>> deps;
 
@@ -110,6 +116,34 @@ public:
     [[nodiscard]] int num_senders() const { return _num_senders; }
     [[nodiscard]] bool is_merging() const { return _is_merging; }
 
+    // Instances that bucket-routed senders can address: values of the 
fragment's
+    // bucket_seq_to_instance_idx map. Senders open one channel per 
destination entry
+    // (one per bucket), so an instance owning no bucket never gets a channel 
— and
+    // never gets EOS. Such orphan instances must start their receiver with 
zero
+    // senders or they block forever (DORIS-24902 K-of-N destination spread).
+    void set_bucket_dest_instances(const std::map<int, int>& 
bucket_seq_to_instance_idx) {
+        for (const auto& [bucket_seq, instance_idx] : 
bucket_seq_to_instance_idx) {
+            _bucket_dest_instances.insert(instance_idx);
+        }
+        _has_bucket_dest_instances = true;
+    }
+
+    // local_task_idx is the per-BE local instance index 
(LocalStateInfo::task_idx) — the
+    // same numbering as bucket_seq_to_instance_idx values (built per worker 
on FE). Do NOT
+    // pass per_fragment_instance_idx here: that is sender_id = the GLOBAL 
index across all
+    // workers, which only coincides with the local index on the first worker 
(single-BE
+    // tests pass, multi-BE silently drops every later worker's buckets).
+    //
+    // Ownership-based orphan detection is only valid when destinations follow 
bucket
+    // ownership, i.e. the non-serial (FE planner dest spread) mode. A serial 
exchange's
+    // destinations funnel to the first instance per worker regardless of 
bucket ownership,
+    // and BE's serial-exchange mechanics already close the other receivers.
+    [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int local_task_idx) 
const {
+        return !is_serial_operator() &&
+               _partition_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
+               _has_bucket_dest_instances && 
!_bucket_dest_instances.contains(local_task_idx);
+    }
+
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
             return {TLocalPartitionType::NOOP};
@@ -126,6 +160,8 @@ private:
     const int _num_senders;
     const bool _is_merging;
     const TPartitionType::type _partition_type;
+    std::set<int> _bucket_dest_instances;
+    bool _has_bucket_dest_instances = false;
 
     // use in merge sort
     size_t _offset;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index d4f74843179..2736c9c542a 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -1563,8 +1563,15 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                   ? 
_params.per_exch_num_senders.find(tnode.node_id)->second
                                   : 0;
         DCHECK_GT(num_senders, 0);
-        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs,
-                                                       num_senders);
+        auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(pool, 
tnode,
+                                                                     
next_operator_id(), descs,
+                                                                     
num_senders);
+        if (!_params.bucket_seq_to_instance_idx.empty()) {
+            // Lets bucket-routed exchanges detect orphan instances (owning no 
bucket) that
+            // no sender channel will ever address — their receivers must 
start at EOS.
+            
exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx);
+        }
+        op = exchange_op;
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         fe_with_old_version = !tnode.__isset.is_serial_operator;
         break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index 0069ddd37db..fd1ff52f5f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -31,6 +31,7 @@ import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBui
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
@@ -211,7 +212,7 @@ public class DistributePlanner {
         List<AssignedJob> receiverInstances = 
filterInstancesWhichCanReceiveDataFromRemote(
                 receiverPlan, enableShareHashTableForBroadcastJoin, linkNode);
         if (linkNode.getPartitionType() == 
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
-            receiverInstances = getDestinationsByBuckets(receiverPlan, 
receiverInstances);
+            receiverInstances = getDestinationsByBuckets(receiverPlan, 
receiverInstances, linkNode);
         }
 
         DataSink sink = senderPlan.getFragmentJob().getFragment().getSink();
@@ -231,12 +232,34 @@ public class DistributePlanner {
 
     private List<AssignedJob> getDestinationsByBuckets(
             PipelineDistributedPlan joinSide,
-            List<AssignedJob> receiverInstances) {
+            List<AssignedJob> receiverInstances,
+            ExchangeNode linkNode) {
         UnassignedScanBucketOlapTableJob bucketJob = 
(UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
         int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
+        // The spread is only valid for a NON-serial exchange: a serial 
exchange
+        // (use_serial_exchange / UNPARTITIONED) receives through one task per 
worker and
+        // expects funnel destinations; spreading them loses every row 
addressed to a
+        // non-first instance. Mirrors the !is_serial_operator() gate on the 
BE orphan
+        // receiver fix.
+        if (isEnableLocalShufflePlanner()
+                && 
!linkNode.isSerialOperatorOnBe(statementContext.getConnectContext())
+                && !joinSide.getInstanceJobs().isEmpty()
+                && joinSide.getInstanceJobs().stream()
+                        
.allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) {
+            // When FE local shuffle planner is on, spread bucket destinations 
across all pooled
+            // instances by their assigned join buckets — the same bucket -> 
instance mapping as
+            // bucket_seq_to_instance_id sent to BE — instead of funneling 
every bucket of a worker
+            // into its first instance and relying on BE local exchange to fan 
out.
+            return sortDestinationInstancesByJoinBuckets(joinSide, bucketNum);
+        }
         return sortDestinationInstancesByBuckets(joinSide, receiverInstances, 
bucketNum);
     }
 
+    private boolean isEnableLocalShufflePlanner() {
+        ConnectContext connectContext = statementContext.getConnectContext();
+        return connectContext != null && 
connectContext.getSessionVariable().isEnableLocalShufflePlanner();
+    }
+
     private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
             PipelineDistributedPlan receiverPlan,
             boolean enableShareHashTableForBroadcastJoin,
@@ -252,6 +275,36 @@ public class DistributePlanner {
         }
     }
 
+    private List<AssignedJob> sortDestinationInstancesByJoinBuckets(
+            PipelineDistributedPlan plan, int bucketNum) {
+        AssignedJob[] instances = new AssignedJob[bucketNum];
+        for (AssignedJob instanceJob : plan.getInstanceJobs()) {
+            LocalShuffleBucketJoinAssignedJob localShuffleJob = 
(LocalShuffleBucketJoinAssignedJob) instanceJob;
+            for (Integer bucketIndex : 
localShuffleJob.getAssignedJoinBucketIndexes()) {
+                if (instances[bucketIndex] != null) {
+                    throw new IllegalStateException(
+                            "Multi instances assigned same join bucket: " + 
instances[bucketIndex]
+                                    + " and " + instanceJob
+                    );
+                }
+                instances[bucketIndex] = instanceJob;
+            }
+        }
+
+        for (int i = 0; i < instances.length; i++) {
+            if (instances[i] == null) {
+                instances[i] = new StaticAssignedJob(
+                        i,
+                        new TUniqueId(-1, -1),
+                        plan.getFragmentJob(),
+                        DummyWorker.INSTANCE,
+                        new DefaultScanSource(ImmutableMap.of())
+                );
+            }
+        }
+        return Arrays.asList(instances);
+    }
+
     private List<AssignedJob> sortDestinationInstancesByBuckets(
             PipelineDistributedPlan plan, List<AssignedJob> unsorted, int 
bucketNum) {
         AssignedJob[] instances = new AssignedJob[bucketNum];
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 256f89843eb..8898987d9a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -165,6 +165,13 @@ public class ExchangeNode extends PlanNode {
 
     @Override
     public boolean isSerialOperatorOnBe(ConnectContext context) {
+        if (context != null && 
context.getSessionVariable().isEnableLocalShufflePlanner()) {
+            // When FE local shuffle planner is on, decouple exchange from 
scan's serial flag.
+            // Scan pooling is handled by LE(PT) after scan; exchange keeps 
its own parallelism.
+            return fragment != null
+                    && isSerialNode()
+                    && fragment.useSerialSource(context);
+        }
         return fragment != null
                 && (isSerialNode() || fragment.hasSerialScanNode())
                 && fragment.useSerialSource(context);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to