This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8459202a752c7a6f9b4a5a8a976f5bfc9d42e3d3 Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 19:05:10 2026 +0800 [fix](exchange) bucket-shuffle orphan instances must start receivers at EOS (DORIS-24902) With K-of-N bucket destination spread (FE planner), senders open one channel per destination entry (one per bucket), so a fragment instance owning no join bucket never gets a channel — and never gets EOS. Its VDataStreamRecvr waits for num_senders EOS forever and the whole fragment hangs (deterministic repro: 13-bucket fact joined under parallel_pipeline_task_num=16 on 1 BE: 3 orphan instances, _num_remaining_senders=16, BE at 2% CPU). This is the same hang observed in the original DORIS-24902 experiments when spreading dests to bucket-owning instances; BE supported only the 1-receiver (serial) and all-N-receivers modes. This adds the K-of-N mode: - ExchangeSourceOperatorX learns the bucket->instance map (TPipelineFragmentParams.bucket_seq_to_instance_idx) at construction; an instance absent from the map values is a bucket-shuffle orphan. - ExchangeLocalState::create_stream_recvr starts orphan receivers with zero senders. - SenderQueue::set_dependency marks a zero-sender queue ready immediately: the usual set_ready fires only when decrement_senders reaches zero, which never happens for a queue born at zero. Safe for all modes: only instances that bucket-routed senders can never address (not in the map values) are affected, and for them EOS-at-birth can only unblock. Non-bucket exchanges and non-pooled bucket fragments (every instance owns buckets) are untouched. --- be/src/exec/exchange/vdata_stream_recvr.h | 12 +++++++++++- be/src/exec/operator/exchange_source_operator.cpp | 10 +++++++++- be/src/exec/operator/exchange_source_operator.h | 22 ++++++++++++++++++++++ be/src/exec/pipeline/pipeline_fragment_context.cpp | 11 +++++++++-- 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/be/src/exec/exchange/vdata_stream_recvr.h b/be/src/exec/exchange/vdata_stream_recvr.h index 2b395ee61f4..bde675cae9a 100644 --- a/be/src/exec/exchange/vdata_stream_recvr.h +++ b/be/src/exec/exchange/vdata_stream_recvr.h @@ -198,7 +198,17 @@ public: void close(); - void set_dependency(std::shared_ptr<Dependency> dependency) { _source_dependency = dependency; } + void set_dependency(std::shared_ptr<Dependency> dependency) { + _source_dependency = dependency; + // A queue created with zero senders (bucket-shuffle orphan instance, see + // ExchangeLocalState::create_stream_recvr) never goes through decrement_senders, + // so the usual reached-zero set_ready never fires — mark it ready at wiring time + // or its task blocks forever on SHUFFLE_DATA_DEPENDENCY. + std::lock_guard<std::mutex> l(_lock); + if (_num_remaining_senders == 0) { + set_source_ready(l); + } + } protected: struct BlockItem; diff --git a/be/src/exec/operator/exchange_source_operator.cpp b/be/src/exec/operator/exchange_source_operator.cpp index e008d599078..8809e0002f9 100644 --- a/be/src/exec/operator/exchange_source_operator.cpp +++ b/be/src/exec/operator/exchange_source_operator.cpp @@ -64,9 +64,17 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const { void ExchangeLocalState::create_stream_recvr(RuntimeState* state) { auto& p = _parent->cast<ExchangeSourceOperatorX>(); + int num_senders = p.num_senders(); + if (p.is_bucket_shuffle_orphan_instance(state->per_fragment_instance_idx())) { + // Bucket-routed senders open one channel per destination entry (one per bucket), + // so an instance owning no bucket never gets a channel — and never gets EOS. + // Start its receiver with zero senders so it reports EOS immediately instead of + // blocking forever (DORIS-24902 K-of-N destination spread). + num_senders = 0; + } stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, _memory_used_counter, state->fragment_instance_id(), p.node_id(), - p.num_senders(), custom_profile(), p.is_merging(), + num_senders, custom_profile(), p.is_merging(), std::max(20480, config::exchg_node_buffer_size_bytes / (p.is_merging() ? p.num_senders() : 1))); } diff --git a/be/src/exec/operator/exchange_source_operator.h b/be/src/exec/operator/exchange_source_operator.h index d47250080a1..fb7598269e3 100644 --- a/be/src/exec/operator/exchange_source_operator.h +++ b/be/src/exec/operator/exchange_source_operator.h @@ -19,6 +19,9 @@ #include <stdint.h> +#include <map> +#include <set> + #include "exec/operator/operator.h" #include "exprs/vexpr_fwd.h" @@ -110,6 +113,23 @@ public: [[nodiscard]] int num_senders() const { return _num_senders; } [[nodiscard]] bool is_merging() const { return _is_merging; } + // Instances that bucket-routed senders can address: values of the fragment's + // bucket_seq_to_instance_idx map. Senders open one channel per destination entry + // (one per bucket), so an instance owning no bucket never gets a channel — and + // never gets EOS. Such orphan instances must start their receiver with zero + // senders or they block forever (DORIS-24902 K-of-N destination spread). + void set_bucket_dest_instances(const std::map<int, int>& bucket_seq_to_instance_idx) { + for (const auto& [bucket_seq, instance_idx] : bucket_seq_to_instance_idx) { + _bucket_dest_instances.insert(instance_idx); + } + _has_bucket_dest_instances = true; + } + + [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int instance_idx) const { + return _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED && + _has_bucket_dest_instances && !_bucket_dest_instances.contains(instance_idx); + } + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (OperatorX<ExchangeLocalState>::is_serial_operator()) { return {TLocalPartitionType::NOOP}; @@ -126,6 +146,8 @@ private: const int _num_senders; const bool _is_merging; const TPartitionType::type _partition_type; + std::set<int> _bucket_dest_instances; + bool _has_bucket_dest_instances = false; // use in merge sort size_t _offset; diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 05cf647a5be..644ff54696f 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -1574,8 +1574,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo ? _params.per_exch_num_senders.find(tnode.node_id)->second : 0; DCHECK_GT(num_senders, 0); - op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs, - num_senders); + auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, + next_operator_id(), descs, + num_senders); + if (!_params.bucket_seq_to_instance_idx.empty()) { + // Lets bucket-routed exchanges detect orphan instances (owning no bucket) that + // no sender channel will ever address — their receivers must start at EOS. + exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx); + } + op = exchange_op; RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); fe_with_old_version = !tnode.__isset.is_serial_operator; break; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
