This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit bad00c5a1994cd4e48368dca0b14768d38adf1ef Author: 924060929 <[email protected]> AuthorDate: Mon Mar 30 12:33:50 2026 +0800 [fix](local shuffle) fix num_senders for UNPARTITIONED sender fragments Fix deadlock when FE-planned local exchange inserts PASSTHROUGH after a serial Exchange in an UNPARTITIONED fragment. The UNPARTITIONED fragment runs on only 1 worker, but ThriftPlansBuilder.senderFragmentOutputsSerially() did not recognize it as serial output (because useSerialSource() returned false for fragments without scan nodes). This caused num_senders to be set to instanceJobs.size() (e.g., 3-4) instead of the actual number of distinct workers (1), making the receiver Exchange wait for senders that never exist. Fix: In senderFragmentOutputsSerially(), treat UNPARTITIONED fragments as always serial (they run on 1 worker and always output serially). --- .../org/apache/doris/qe/runtime/ThriftPlansBuilder.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index ad1000c23da..aefaef2dc03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -297,10 +297,21 @@ public class ThriftPlansBuilder { PipelineDistributedPlan childPlan, ConnectContext connectContext) { PlanFragment fragment = childPlan.getFragmentJob().getFragment(); PlanNode planRoot = fragment.getPlanRoot(); - if (!fragment.useSerialSource(connectContext)) { - return false; + // A fragment outputs serially if its output pipeline has only 1 task. + // This happens when: + // 1. The fragment uses serial source (pooling scan) AND the plan root + // is serial or has no serial children (no local exchange fan-out) + // 2. The fragment's data partition is UNPARTITIONED — it runs on only + // 1 worker and always outputs serially regardless of local exchange + if (fragment.getDataPartition().isPartitioned()) { + if (!fragment.useSerialSource(connectContext)) { + return false; + } + return planRoot.isSerialOperator() || !planRoot.hasSerialChildren(); + } else { + // UNPARTITIONED fragment: only 1 worker, outputs serially + return true; } - return planRoot.isSerialOperator() || !planRoot.hasSerialChildren(); } private static void setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
