This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
new f9aa0f8f5ea [fix](local shuffle) fix num_senders for UNPARTITIONED
sender fragments
f9aa0f8f5ea is described below
commit f9aa0f8f5ea40f51715894dae5e68bad14acc22e
Author: 924060929 <[email protected]>
AuthorDate: Mon Mar 30 12:33:50 2026 +0800
[fix](local shuffle) fix num_senders for UNPARTITIONED sender fragments
Fix deadlock when FE-planned local exchange inserts PASSTHROUGH after a
serial Exchange in an UNPARTITIONED fragment. The UNPARTITIONED fragment
runs on only 1 worker, but
ThriftPlansBuilder.senderFragmentOutputsSerially()
did not recognize it as serial output (because useSerialSource() returned
false for fragments without scan nodes). This caused num_senders to be set
to instanceJobs.size() (e.g., 3-4) instead of the actual number of distinct
workers (1), making the receiver Exchange wait for senders that never exist.
Fix: In senderFragmentOutputsSerially(), treat UNPARTITIONED fragments as
always serial (they run on 1 worker and always output serially).
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
.../org/apache/doris/qe/runtime/ThriftPlansBuilder.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index ad1000c23da..aefaef2dc03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -297,10 +297,21 @@ public class ThriftPlansBuilder {
PipelineDistributedPlan childPlan, ConnectContext connectContext) {
PlanFragment fragment = childPlan.getFragmentJob().getFragment();
PlanNode planRoot = fragment.getPlanRoot();
- if (!fragment.useSerialSource(connectContext)) {
- return false;
+ // A fragment outputs serially if its output pipeline has only 1 task.
+ // This happens when:
+ // 1. The fragment uses serial source (pooling scan) AND the plan root
+ // is serial or has no serial children (no local exchange fan-out)
+ // 2. The fragment's data partition is UNPARTITIONED — it runs on only
+ // 1 worker and always outputs serially regardless of local exchange
+ if (fragment.getDataPartition().isPartitioned()) {
+ if (!fragment.useSerialSource(connectContext)) {
+ return false;
+ }
+ return planRoot.isSerialOperator() ||
!planRoot.hasSerialChildren();
+ } else {
+ // UNPARTITIONED fragment: only 1 worker, outputs serially
+ return true;
}
- return planRoot.isSerialOperator() || !planRoot.hasSerialChildren();
}
private static void
setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]