This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8459202a752c7a6f9b4a5a8a976f5bfc9d42e3d3
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 19:05:10 2026 +0800

    [fix](exchange) bucket-shuffle orphan instances must start receivers at EOS 
(DORIS-24902)
    
    With K-of-N bucket destination spread (FE planner), senders open one 
channel per
    destination entry (one per bucket), so a fragment instance owning no join 
bucket
    never gets a channel — and never gets EOS. Its VDataStreamRecvr waits for
    num_senders EOS forever and the whole fragment hangs (deterministic repro:
    13-bucket fact joined under parallel_pipeline_task_num=16 on 1 BE: 3 orphan
    instances, _num_remaining_senders=16, BE at 2% CPU).
    
    This is the same hang observed in the original DORIS-24902 experiments when
    spreading dests to bucket-owning instances; BE supported only the 1-receiver
    (serial) and all-N-receivers modes. This adds the K-of-N mode:
    
    - ExchangeSourceOperatorX learns the bucket->instance map
      (TPipelineFragmentParams.bucket_seq_to_instance_idx) at construction; an
      instance absent from the map values is a bucket-shuffle orphan.
    - ExchangeLocalState::create_stream_recvr starts orphan receivers with zero
      senders.
    - SenderQueue::set_dependency marks a zero-sender queue ready immediately:
      the usual set_ready fires only when decrement_senders reaches zero, which
      never happens for a queue born at zero.
    
    Safe for all modes: only instances that bucket-routed senders can never
    address (not in the map values) are affected, and for them EOS-at-birth can
    only unblock. Non-bucket exchanges and non-pooled bucket fragments (every
    instance owns buckets) are untouched.
---
 be/src/exec/exchange/vdata_stream_recvr.h          | 12 +++++++++++-
 be/src/exec/operator/exchange_source_operator.cpp  | 10 +++++++++-
 be/src/exec/operator/exchange_source_operator.h    | 22 ++++++++++++++++++++++
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 11 +++++++++--
 4 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/exchange/vdata_stream_recvr.h 
b/be/src/exec/exchange/vdata_stream_recvr.h
index 2b395ee61f4..bde675cae9a 100644
--- a/be/src/exec/exchange/vdata_stream_recvr.h
+++ b/be/src/exec/exchange/vdata_stream_recvr.h
@@ -198,7 +198,17 @@ public:
 
     void close();
 
-    void set_dependency(std::shared_ptr<Dependency> dependency) { 
_source_dependency = dependency; }
+    void set_dependency(std::shared_ptr<Dependency> dependency) {
+        _source_dependency = dependency;
+        // A queue created with zero senders (bucket-shuffle orphan instance, 
see
+        // ExchangeLocalState::create_stream_recvr) never goes through 
decrement_senders,
+        // so the usual reached-zero set_ready never fires — mark it ready at 
wiring time
+        // or its task blocks forever on SHUFFLE_DATA_DEPENDENCY.
+        std::lock_guard<std::mutex> l(_lock);
+        if (_num_remaining_senders == 0) {
+            set_source_ready(l);
+        }
+    }
 
 protected:
     struct BlockItem;
diff --git a/be/src/exec/operator/exchange_source_operator.cpp 
b/be/src/exec/operator/exchange_source_operator.cpp
index e008d599078..8809e0002f9 100644
--- a/be/src/exec/operator/exchange_source_operator.cpp
+++ b/be/src/exec/operator/exchange_source_operator.cpp
@@ -64,9 +64,17 @@ std::string ExchangeSourceOperatorX::debug_string(int 
indentation_level) const {
 
 void ExchangeLocalState::create_stream_recvr(RuntimeState* state) {
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
+    int num_senders = p.num_senders();
+    if 
(p.is_bucket_shuffle_orphan_instance(state->per_fragment_instance_idx())) {
+        // Bucket-routed senders open one channel per destination entry (one 
per bucket),
+        // so an instance owning no bucket never gets a channel — and never 
gets EOS.
+        // Start its receiver with zero senders so it reports EOS immediately 
instead of
+        // blocking forever (DORIS-24902 K-of-N destination spread).
+        num_senders = 0;
+    }
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, _memory_used_counter, state->fragment_instance_id(), 
p.node_id(),
-            p.num_senders(), custom_profile(), p.is_merging(),
+            num_senders, custom_profile(), p.is_merging(),
             std::max(20480, config::exchg_node_buffer_size_bytes /
                                     (p.is_merging() ? p.num_senders() : 1)));
 }
diff --git a/be/src/exec/operator/exchange_source_operator.h 
b/be/src/exec/operator/exchange_source_operator.h
index d47250080a1..fb7598269e3 100644
--- a/be/src/exec/operator/exchange_source_operator.h
+++ b/be/src/exec/operator/exchange_source_operator.h
@@ -19,6 +19,9 @@
 
 #include <stdint.h>
 
+#include <map>
+#include <set>
+
 #include "exec/operator/operator.h"
 #include "exprs/vexpr_fwd.h"
 
@@ -110,6 +113,23 @@ public:
     [[nodiscard]] int num_senders() const { return _num_senders; }
     [[nodiscard]] bool is_merging() const { return _is_merging; }
 
+    // Instances that bucket-routed senders can address: values of the 
fragment's
+    // bucket_seq_to_instance_idx map. Senders open one channel per 
destination entry
+    // (one per bucket), so an instance owning no bucket never gets a channel 
— and
+    // never gets EOS. Such orphan instances must start their receiver with 
zero
+    // senders or they block forever (DORIS-24902 K-of-N destination spread).
+    void set_bucket_dest_instances(const std::map<int, int>& 
bucket_seq_to_instance_idx) {
+        for (const auto& [bucket_seq, instance_idx] : 
bucket_seq_to_instance_idx) {
+            _bucket_dest_instances.insert(instance_idx);
+        }
+        _has_bucket_dest_instances = true;
+    }
+
+    [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int instance_idx) 
const {
+        return _partition_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
+               _has_bucket_dest_instances && 
!_bucket_dest_instances.contains(instance_idx);
+    }
+
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
             return {TLocalPartitionType::NOOP};
@@ -126,6 +146,8 @@ private:
     const int _num_senders;
     const bool _is_merging;
     const TPartitionType::type _partition_type;
+    std::set<int> _bucket_dest_instances;
+    bool _has_bucket_dest_instances = false;
 
     // use in merge sort
     size_t _offset;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 05cf647a5be..644ff54696f 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -1574,8 +1574,15 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                   ? 
_params.per_exch_num_senders.find(tnode.node_id)->second
                                   : 0;
         DCHECK_GT(num_senders, 0);
-        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs,
-                                                       num_senders);
+        auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(pool, 
tnode,
+                                                                     
next_operator_id(), descs,
+                                                                     
num_senders);
+        if (!_params.bucket_seq_to_instance_idx.empty()) {
+            // Lets bucket-routed exchanges detect orphan instances (owning no 
bucket) that
+            // no sender channel will ever address — their receivers must 
start at EOS.
+            
exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx);
+        }
+        op = exchange_op;
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         fe_with_old_version = !tnode.__isset.is_serial_operator;
         break;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to