This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize_pr in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1fd8ebfea29461971c22e65b127a520fce1840fb Author: 924060929 <[email protected]> AuthorDate: Wed Jun 24 17:45:51 2026 +0800 [opt](local shuffle) DORIS-24902 part1: decouple exchange serial + spread bucket-shuffle dests ExchangeNode serial decoupling: when the FE local-shuffle planner is enabled, ExchangeNode.isSerialOperatorOnBe no longer inherits fragment.hasSerialScanNode; serial status comes from the node itself only. Bucket-shuffle dest spreading: in pooled-scan fragments with the FE planner, DistributePlanner spreads destinations to all instances that own buckets (via assignedJoinBucketIndexes), instead of funneling to the first instance per worker. This eliminates the serial bottleneck diagnosed in DORIS-24902 (supplier domain exchange dests: 3 funneled → 24 spread). Serial exchange gate: dest spreading is skipped for serial exchanges where the exchange itself runs with 1 task (isSerialOperatorOnBe), preserving the funnel semantics that serial exchanges require. BE orphan instance fix: when dest spreading targets only bucket-owning instances, non-owning instances become orphans whose receivers never get data. Fixed by creating orphan recvrs with num_senders=0 (ready immediately at EOS) and using per-BE local task index for orphan detection (the prior per-fragment global index silently dropped entire buckets on multi-BE deployments). --- be/src/exec/exchange/vdata_stream_recvr.h | 12 ++++- be/src/exec/operator/exchange_source_operator.cpp | 11 ++++- be/src/exec/operator/exchange_source_operator.h | 36 ++++++++++++++ be/src/exec/pipeline/pipeline_fragment_context.cpp | 11 ++++- .../trees/plans/distribute/DistributePlanner.java | 57 +++++++++++++++++++++- .../org/apache/doris/planner/ExchangeNode.java | 7 +++ 6 files changed, 128 insertions(+), 6 deletions(-) diff --git a/be/src/exec/exchange/vdata_stream_recvr.h b/be/src/exec/exchange/vdata_stream_recvr.h index 2b395ee61f4..bde675cae9a 100644 --- a/be/src/exec/exchange/vdata_stream_recvr.h +++ b/be/src/exec/exchange/vdata_stream_recvr.h @@ -198,7 +198,17 @@ public: void close(); - void set_dependency(std::shared_ptr<Dependency> dependency) { _source_dependency = dependency; } + void set_dependency(std::shared_ptr<Dependency> dependency) { + _source_dependency = dependency; + // A queue created with zero senders (bucket-shuffle orphan instance, see + // ExchangeLocalState::create_stream_recvr) never goes through decrement_senders, + // so the usual reached-zero set_ready never fires — mark it ready at wiring time + // or its task blocks forever on SHUFFLE_DATA_DEPENDENCY. + std::lock_guard<std::mutex> l(_lock); + if (_num_remaining_senders == 0) { + set_source_ready(l); + } + } protected: struct BlockItem; diff --git a/be/src/exec/operator/exchange_source_operator.cpp b/be/src/exec/operator/exchange_source_operator.cpp index 1e4d24de8e8..69b27536ca8 100644 --- a/be/src/exec/operator/exchange_source_operator.cpp +++ b/be/src/exec/operator/exchange_source_operator.cpp @@ -64,9 +64,17 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const { void ExchangeLocalState::create_stream_recvr(RuntimeState* state) { auto& p = _parent->cast<ExchangeSourceOperatorX>(); + int num_senders = p.num_senders(); + if (p.is_bucket_shuffle_orphan_instance(local_task_idx)) { + // Bucket-routed senders open one channel per destination entry (one per bucket), + // so an instance owning no bucket never gets a channel — and never gets EOS. + // Start its receiver with zero senders so it reports EOS immediately instead of + // blocking forever (DORIS-24902 K-of-N destination spread). + num_senders = 0; + } stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, _memory_used_counter, state->fragment_instance_id(), p.node_id(), - p.num_senders(), custom_profile(), p.is_merging(), + num_senders, custom_profile(), p.is_merging(), std::max(20480, config::exchg_node_buffer_size_bytes / (p.is_merging() ? p.num_senders() : 1))); } @@ -75,6 +83,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); + local_task_idx = info.task_idx; create_stream_recvr(state); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); diff --git a/be/src/exec/operator/exchange_source_operator.h b/be/src/exec/operator/exchange_source_operator.h index c6f651aaffd..40a11cc8a31 100644 --- a/be/src/exec/operator/exchange_source_operator.h +++ b/be/src/exec/operator/exchange_source_operator.h @@ -19,6 +19,9 @@ #include <stdint.h> +#include <map> +#include <set> + #include "exec/operator/operator.h" #include "exprs/vexpr_fwd.h" @@ -75,6 +78,9 @@ public: doris::VExprContextSPtrs ordering_expr_ctxs; int64_t num_rows_skipped; bool is_ready; + // per-BE local instance index (LocalStateInfo::task_idx), used for bucket-shuffle + // orphan detection in create_stream_recvr — see is_bucket_shuffle_orphan_instance. + int local_task_idx = 0; std::vector<std::shared_ptr<Dependency>> deps; @@ -110,6 +116,34 @@ public: [[nodiscard]] int num_senders() const { return _num_senders; } [[nodiscard]] bool is_merging() const { return _is_merging; } + // Instances that bucket-routed senders can address: values of the fragment's + // bucket_seq_to_instance_idx map. Senders open one channel per destination entry + // (one per bucket), so an instance owning no bucket never gets a channel — and + // never gets EOS. Such orphan instances must start their receiver with zero + // senders or they block forever (DORIS-24902 K-of-N destination spread). + void set_bucket_dest_instances(const std::map<int, int>& bucket_seq_to_instance_idx) { + for (const auto& [bucket_seq, instance_idx] : bucket_seq_to_instance_idx) { + _bucket_dest_instances.insert(instance_idx); + } + _has_bucket_dest_instances = true; + } + + // local_task_idx is the per-BE local instance index (LocalStateInfo::task_idx) — the + // same numbering as bucket_seq_to_instance_idx values (built per worker on FE). Do NOT + // pass per_fragment_instance_idx here: that is sender_id = the GLOBAL index across all + // workers, which only coincides with the local index on the first worker (single-BE + // tests pass, multi-BE silently drops every later worker's buckets). + // + // Ownership-based orphan detection is only valid when destinations follow bucket + // ownership, i.e. the non-serial (FE planner dest spread) mode. A serial exchange's + // destinations funnel to the first instance per worker regardless of bucket ownership, + // and BE's serial-exchange mechanics already close the other receivers. + [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int local_task_idx) const { + return !is_serial_operator() && + _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED && + _has_bucket_dest_instances && !_bucket_dest_instances.contains(local_task_idx); + } + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { if (OperatorX<ExchangeLocalState>::is_serial_operator()) { return {TLocalPartitionType::NOOP}; @@ -126,6 +160,8 @@ private: const int _num_senders; const bool _is_merging; const TPartitionType::type _partition_type; + std::set<int> _bucket_dest_instances; + bool _has_bucket_dest_instances = false; // use in merge sort size_t _offset; diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index d4f74843179..2736c9c542a 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -1563,8 +1563,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo ? _params.per_exch_num_senders.find(tnode.node_id)->second : 0; DCHECK_GT(num_senders, 0); - op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs, - num_senders); + auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, + next_operator_id(), descs, + num_senders); + if (!_params.bucket_seq_to_instance_idx.empty()) { + // Lets bucket-routed exchanges detect orphan instances (owning no bucket) that + // no sender channel will ever address — their receivers must start at EOS. + exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx); + } + op = exchange_op; RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); fe_with_old_version = !tnode.__isset.is_serial_operator; break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index 0069ddd37db..fd1ff52f5f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBui import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder; @@ -211,7 +212,7 @@ public class DistributePlanner { List<AssignedJob> receiverInstances = filterInstancesWhichCanReceiveDataFromRemote( receiverPlan, enableShareHashTableForBroadcastJoin, linkNode); if (linkNode.getPartitionType() == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) { - receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances); + receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances, linkNode); } DataSink sink = senderPlan.getFragmentJob().getFragment().getSink(); @@ -231,12 +232,34 @@ public class DistributePlanner { private List<AssignedJob> getDestinationsByBuckets( PipelineDistributedPlan joinSide, - List<AssignedJob> receiverInstances) { + List<AssignedJob> receiverInstances, + ExchangeNode linkNode) { UnassignedScanBucketOlapTableJob bucketJob = (UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob(); int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum(); + // The spread is only valid for a NON-serial exchange: a serial exchange + // (use_serial_exchange / UNPARTITIONED) receives through one task per worker and + // expects funnel destinations; spreading them loses every row addressed to a + // non-first instance. Mirrors the !is_serial_operator() gate on the BE orphan + // receiver fix. + if (isEnableLocalShufflePlanner() + && !linkNode.isSerialOperatorOnBe(statementContext.getConnectContext()) + && !joinSide.getInstanceJobs().isEmpty() + && joinSide.getInstanceJobs().stream() + .allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) { + // When FE local shuffle planner is on, spread bucket destinations across all pooled + // instances by their assigned join buckets — the same bucket -> instance mapping as + // bucket_seq_to_instance_id sent to BE — instead of funneling every bucket of a worker + // into its first instance and relying on BE local exchange to fan out. + return sortDestinationInstancesByJoinBuckets(joinSide, bucketNum); + } return sortDestinationInstancesByBuckets(joinSide, receiverInstances, bucketNum); } + private boolean isEnableLocalShufflePlanner() { + ConnectContext connectContext = statementContext.getConnectContext(); + return connectContext != null && connectContext.getSessionVariable().isEnableLocalShufflePlanner(); + } + private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote( PipelineDistributedPlan receiverPlan, boolean enableShareHashTableForBroadcastJoin, @@ -252,6 +275,36 @@ public class DistributePlanner { } } + private List<AssignedJob> sortDestinationInstancesByJoinBuckets( + PipelineDistributedPlan plan, int bucketNum) { + AssignedJob[] instances = new AssignedJob[bucketNum]; + for (AssignedJob instanceJob : plan.getInstanceJobs()) { + LocalShuffleBucketJoinAssignedJob localShuffleJob = (LocalShuffleBucketJoinAssignedJob) instanceJob; + for (Integer bucketIndex : localShuffleJob.getAssignedJoinBucketIndexes()) { + if (instances[bucketIndex] != null) { + throw new IllegalStateException( + "Multi instances assigned same join bucket: " + instances[bucketIndex] + + " and " + instanceJob + ); + } + instances[bucketIndex] = instanceJob; + } + } + + for (int i = 0; i < instances.length; i++) { + if (instances[i] == null) { + instances[i] = new StaticAssignedJob( + i, + new TUniqueId(-1, -1), + plan.getFragmentJob(), + DummyWorker.INSTANCE, + new DefaultScanSource(ImmutableMap.of()) + ); + } + } + return Arrays.asList(instances); + } + private List<AssignedJob> sortDestinationInstancesByBuckets( PipelineDistributedPlan plan, List<AssignedJob> unsorted, int bucketNum) { AssignedJob[] instances = new AssignedJob[bucketNum]; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 256f89843eb..8898987d9a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -165,6 +165,13 @@ public class ExchangeNode extends PlanNode { @Override public boolean isSerialOperatorOnBe(ConnectContext context) { + if (context != null && context.getSessionVariable().isEnableLocalShufflePlanner()) { + // When FE local shuffle planner is on, decouple exchange from scan's serial flag. + // Scan pooling is handled by LE(PT) after scan; exchange keeps its own parallelism. + return fragment != null + && isSerialNode() + && fragment.useSerialSource(context); + } return fragment != null && (isSerialNode() || fragment.hasSerialScanNode()) && fragment.useSerialSource(context); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
