This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 212f2fb6570730813d0c88098e5010394e4b2348 Author: 924060929 <[email protected]> AuthorDate: Tue Mar 31 14:33:33 2026 +0800 [fix](local shuffle) align FE local exchange planning with BE pipeline model Key changes to match BE's _plan_local_exchange behavior: 1. AddLocalExchange + PlanTranslatorContext: track isLocalShuffleFragment flag (true only for LocalShuffleAssignedJob / pooling-scan fragments). Skip single-instance fragments entirely — BE's _plan_local_exchange processes all pipelines but with num_instances=1 local exchange is a no-op; skipping avoids mismatched sender/receiver counts. Insert PASSTHROUGH at fragment root when isLocalShuffle && newRoot.isSerialOperator() — mirrors BE base-class required_data_distribution() returning PASSTHROUGH for serial children so all _num_instances sink tasks run and send EOS. 2. PlanNode enforceChild / enforceChildExchange / forceEnforceChildExchange: heavy_ops bottleneck avoidance — when child is serial and exchange is heavy (HASH/BUCKET_HASH/ADAPTIVE_PASSTHROUGH), insert PASSTHROUGH fan-out first. Only for pooling-scan (isLocalShuffleFragment) where serial means exactly 1 pipeline task. 3. SortNode: check isAnalyticSort first (mirrors BE _is_analytic_sort flag); remove dead fragment.useSerialSource() + instanceof ScanNode branch that has been superseded by the AddLocalExchange PASSTHROUGH-at-root logic. 4. ExchangeNode: serial Exchange (UNPARTITIONED or use_serial_exchange=true) returns NOOP — mirrors BE ExchangeSourceOperatorX::required_data_distribution(). 5. ThriftPlansBuilder: simplify computeExchangeSenderNum — remove senderFragmentOutputsSerially logic that tried to adjust sender count for serial output pipelines. With the PASSTHROUGH-at-root fix, all fragment instances always create sink tasks, so sender count = instanceJobs.size(). --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 2 - .../glue/translator/PlanTranslatorContext.java | 15 ++++++++ .../org/apache/doris/planner/AddLocalExchange.java | 37 ++++++++++++++---- .../org/apache/doris/planner/ExchangeNode.java | 10 ++++- .../java/org/apache/doris/planner/PlanNode.java | 44 ++++++++++++++++++---- .../java/org/apache/doris/planner/SortNode.java | 17 +++------ .../doris/qe/runtime/ThriftPlansBuilder.java | 41 ++------------------ 7 files changed, 100 insertions(+), 66 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 9a4d14494ea..abfe3048fff 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -687,8 +687,6 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip Status PipelineFragmentContext::_create_deferred_local_exchangers() { for (auto& info : _deferred_exchangers) { - // num_tasks raise is handled globally in _build_and_prepare_full_pipeline - // after this function returns. No per-exchanger adjustment needed here. const int sender_count = info.upstream_pipe->num_tasks(); switch (info.partition_type) { case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java index afd1a8d26d3..43088d071ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java @@ -118,6 +118,13 @@ public class PlanTranslatorContext { private final Map<PlanNodeId, Boolean> serialAncestorInPipelineMap = Maps.newHashMap(); + // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan with + // ignoreDataDistribution → _parallel_instances=1 in BE). When true, serial operators + // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out (heavy_ops). + // When false (regular bucket distribution), each instance has its own scan range + // and num_tasks matches _num_instances, so no fan-out is needed. + private boolean isLocalShuffleFragment = false; + private boolean isTopMaterializeNode = true; private final Set<SlotId> virtualColumnIds = Sets.newHashSet(); @@ -255,6 +262,14 @@ public class PlanTranslatorContext { return serialAncestorInPipelineMap.getOrDefault(node.getId(), false); } + public void setIsLocalShuffleFragment(boolean isLocalShuffle) { + this.isLocalShuffleFragment = isLocalShuffle; + } + + public boolean isLocalShuffleFragment() { + return isLocalShuffleFragment; + } + public SlotDescriptor addSlotDesc(TupleDescriptor t) { return descTable.addSlotDescriptor(t); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java index d982ab80385..120125ff008 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java @@ -32,35 +32,58 @@ import java.util.List; public class AddLocalExchange { public void addLocalExchange(List<PlanFragment> fragments, PlanTranslatorContext context) { for (PlanFragment fragment : fragments) { - addLocalExchangeForFragment(fragment, context); + addLocalExchangeForFragment(fragment, context, false); } } - /** addLocalExchange with distributed plans, skipping single-instance fragments (matching BE behavior) */ + /** addLocalExchange with distributed plans, skipping single-instance fragments. + * BE's _plan_local_exchange processes all pipelines regardless of _num_instances, + * but with _num_instances==1 all pipelines have 1 task so local exchange is a no-op. + * Skipping avoids inserting LOCAL_EXCHANGE_NODEs that change pipeline structure + * without benefit and may cause sender/receiver count mismatches. */ public void addLocalExchange(FragmentIdMapping<DistributedPlan> distributedPlans, PlanTranslatorContext context) { for (DistributedPlan plan : distributedPlans.values()) { PipelineDistributedPlan pipePlan = (PipelineDistributedPlan) plan; int instanceCount = pipePlan.getInstanceJobs().size(); - // Match BE's early-return: if (_num_instances <= 1) return OK(); if (instanceCount <= 1) { continue; } PlanFragment fragment = pipePlan.getFragmentJob().getFragment(); - addLocalExchangeForFragment(fragment, context); + boolean isLocalShuffle = pipePlan.getInstanceJobs().stream() + .anyMatch(j -> j + instanceof org.apache.doris.nereids.trees.plans.distribute + .worker.job.LocalShuffleAssignedJob); + addLocalExchangeForFragment(fragment, context, isLocalShuffle); } } - private void addLocalExchangeForFragment(PlanFragment fragment, PlanTranslatorContext context) { + /** @return true if a LOCAL_EXCHANGE_NODE was inserted in this fragment */ + private void addLocalExchangeForFragment(PlanFragment fragment, PlanTranslatorContext context, + boolean isLocalShuffle) { DataSink sink = fragment.getSink(); LocalExchangeTypeRequire require = sink == null ? LocalExchangeTypeRequire.noRequire() : sink.getLocalExchangeTypeRequire(); PlanNode root = fragment.getPlanRoot(); context.setHasSerialAncestorInPipeline(root, false); + context.setIsLocalShuffleFragment(isLocalShuffle); Pair<PlanNode, LocalExchangeType> output = root .enforceAndDeriveLocalExchange(context, null, require); - if (output.first != root) { - fragment.setPlanRoot(output.first); + PlanNode newRoot = output.first; + // Mirror BE OperatorBase base class required_data_distribution(): + // when child (fragment root) is serial AND the fragment uses pooling scan + // (LocalShuffleAssignedJob → ignoreDataDistribution → _parallel_instances=1), + // the DataStreamSink pipeline has num_tasks=1 so only instance 0 creates a + // task and sends EOS. Downstream receivers expect _num_instances EOSes → hang. + // Insert PASSTHROUGH fan-out to create _num_instances sink tasks. + // Non-pooling fragments (regular bucket distribution) have _num_instances == + // bucket_count and every instance gets a scan range, so all sink tasks run. + if (isLocalShuffle && newRoot.isSerialOperator()) { + newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot, + LocalExchangeType.PASSTHROUGH, null); + } + if (newRoot != root) { + fragment.setPlanRoot(newRoot); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 165b4d3756e..508788717c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -180,7 +180,15 @@ public class ExchangeNode extends PlanNode { @Override public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, PlanNode parent, LocalExchangeTypeRequire parentRequire) { - if (partitionType == TPartitionType.HASH_PARTITIONED) { + // Mirror BE's ExchangeSourceOperatorX::required_data_distribution(): + // serial Exchange returns NOOP. This covers both: + // 1. UNPARTITIONED exchange (naturally serial) → already NOOP in else branch + // 2. HASH/BUCKET exchange made serial by use_serial_exchange=true → override to NOOP + // Mirror BE ExchangeSourceOperatorX::required_data_distribution(): + // serial → NOOP; else HASH/BUCKET/NOOP based on partition type. + if (isSerialOperator()) { + return Pair.of(this, LocalExchangeType.NOOP); + } else if (partitionType == TPartitionType.HASH_PARTITIONED) { return Pair.of(this, LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); } else if (partitionType == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) { return Pair.of(this, LocalExchangeType.BUCKET_HASH_SHUFFLE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 1bce63f1416..805d419b86b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -984,11 +984,15 @@ public abstract class PlanNode extends TreeNode<PlanNode> { return childOutput; } List<Expr> distributeExprs = childIndex >= 0 ? getChildDistributeExprList(childIndex) : null; - // Heavy ops bottleneck avoidance (mirrors BE pipeline_fragment_context.cpp:1013-1025): - // When upstream has 1 task (serial/pooling scan) and exchange is heavy (hash shuffle, - // bucket hash, adaptive passthrough), insert PASSTHROUGH fan-out first to avoid - // single-task bottleneck on the heavy exchange sink. - if (preferType.isHeavyOperation() && childOutput.first.isSerialOperator()) { + // Heavy ops bottleneck avoidance (mirrors BE pipeline_fragment_context.cpp:1025-1038): + // Heavy ops bottleneck avoidance (mirrors BE pipeline_fragment_context.cpp:1025-1038): + // When upstream has 1 task (serial source) and exchange is heavy, insert PASSTHROUGH + // fan-out first to avoid single-task bottleneck on the heavy exchange sink. + // Only applies to local-shuffle fragments (pooling scan) where _parallel_instances=1 + // causes serial pipelines to have 1 task. In non-pooling fragments, each instance + // has its own scan range and all pipelines have _num_instances tasks. + if (translatorContext.isLocalShuffleFragment() + && preferType.isHeavyOperation() && childOutput.first.isSerialOperator()) { PlanNode ptNode = new LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first, LocalExchangeType.PASSTHROUGH, null); return Pair.of( @@ -1035,9 +1039,22 @@ public abstract class PlanNode extends TreeNode<PlanNode> { if (!require.satisfy(childOutput.second)) { LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( require, translatorContext, this, childOutput.first); + List<Expr> distributeExprs = getChildDistributeExprList(childIndex); + // Heavy ops bottleneck avoidance (same as enforceChild): + // serial child + heavy exchange → insert PASSTHROUGH fan-out first + // Only for local-shuffle (pooling scan) fragments where serial means 1 task. + if (translatorContext.isLocalShuffleFragment() + && preferType.isHeavyOperation() && childOutput.first.isSerialOperator()) { + PlanNode ptNode = new LocalExchangeNode(translatorContext.nextPlanNodeId(), + childOutput.first, LocalExchangeType.PASSTHROUGH, null); + return Pair.of( + new LocalExchangeNode(translatorContext.nextPlanNodeId(), ptNode, + preferType, distributeExprs), + childOutput.second); + } return Pair.of( new LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first, - preferType, getChildDistributeExprList(childIndex)), + preferType, distributeExprs), childOutput.second); } return childOutput; @@ -1059,9 +1076,22 @@ public abstract class PlanNode extends TreeNode<PlanNode> { if (require.preferType() != LocalExchangeType.NOOP) { LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( require, translatorContext, this, childOutput.first); + List<Expr> distributeExprs = getChildDistributeExprList(childIndex); + // Heavy ops bottleneck avoidance (same as enforceChild): + // serial child + heavy exchange → insert PASSTHROUGH fan-out first + // Only for local-shuffle (pooling scan) fragments where serial means 1 task. + if (translatorContext.isLocalShuffleFragment() + && preferType.isHeavyOperation() && childOutput.first.isSerialOperator()) { + PlanNode ptNode = new LocalExchangeNode(translatorContext.nextPlanNodeId(), + childOutput.first, LocalExchangeType.PASSTHROUGH, null); + return Pair.of( + new LocalExchangeNode(translatorContext.nextPlanNodeId(), ptNode, + preferType, distributeExprs), + childOutput.second); + } return Pair.of( new LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first, - preferType, getChildDistributeExprList(childIndex)), + preferType, distributeExprs), childOutput.second); } return childOutput; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 78f55ca165c..f7beb670c01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -255,13 +255,11 @@ public class SortNode extends PlanNode { LocalExchangeTypeRequire requireChild; LocalExchangeType outputType = null; - if (children.get(0) instanceof AnalyticEvalNode) { - if (mergeByexchange) { - // Outer merge-sort above analytic: data is already sorted/partitioned by analytic, - // just need passthrough for merge. BE: SortSink._merge_by_exchange=true → PASSTHROUGH. - requireChild = LocalExchangeTypeRequire.requirePassthrough(); - outputType = LocalExchangeType.PASSTHROUGH; - } else if (AddLocalExchange.isColocated(this)) { + if (isAnalyticSort) { + // BE: SortSink._is_analytic_sort=true → required_data_distribution() = HASH. + // This sort serves a parent AnalyticEvalNode (window function) and requires + // data partitioned by the analytic's partition exprs. + if (AddLocalExchange.isColocated(this)) { requireChild = LocalExchangeTypeRequire.requireHash(); outputType = AddLocalExchange.resolveExchangeType( LocalExchangeTypeRequire.requireHash(), translatorContext, this, @@ -273,11 +271,8 @@ public class SortNode extends PlanNode { // BE: SortSink._merge_by_exchange=true → required_data_distribution() = PASSTHROUGH. requireChild = LocalExchangeTypeRequire.requirePassthrough(); outputType = LocalExchangeType.PASSTHROUGH; - } else if (fragment.useSerialSource(translatorContext.getConnectContext()) - && children.get(0) instanceof ScanNode) { - requireChild = LocalExchangeTypeRequire.requirePassthrough(); - outputType = LocalExchangeType.PASSTHROUGH; } else { + // BE: else → NOOP requireChild = LocalExchangeTypeRequire.noRequire(); outputType = LocalExchangeType.NOOP; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index ad1000c23da..2dfafd2ff6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -126,8 +126,7 @@ public class ThriftPlansBuilder { for (PipelineDistributedPlan currentFragmentPlan : distributedPlans) { sharedFileScanRangeParams.putAll(computeFileScanRangeParams(currentFragmentPlan)); - Map<Integer, Integer> exchangeSenderNum = computeExchangeSenderNum( - currentFragmentPlan, coordinatorContext.connectContext); + Map<Integer, Integer> exchangeSenderNum = computeExchangeSenderNum(currentFragmentPlan); ListMultimap<DistributedPlanWorker, AssignedJob> instancesPerWorker = groupInstancePerWorker(currentFragmentPlan); Map<DistributedPlanWorker, TPipelineFragmentParams> workerToCurrentFragment = Maps.newLinkedHashMap(); @@ -259,50 +258,16 @@ public class ThriftPlansBuilder { } private static Map<Integer, Integer> computeExchangeSenderNum( - PipelineDistributedPlan distributedPlan, ConnectContext connectContext) { + PipelineDistributedPlan distributedPlan) { Map<Integer, Integer> senderNum = Maps.newLinkedHashMap(); for (Entry<ExchangeNode, DistributedPlan> kv : distributedPlan.getInputs().entries()) { ExchangeNode exchangeNode = kv.getKey(); PipelineDistributedPlan childPlan = (PipelineDistributedPlan) kv.getValue(); - List<AssignedJob> childInstances = childPlan.getInstanceJobs(); - - // The receiver's stream_recvr num_senders must match the number of tasks in the - // sender fragment's output pipeline. - // - // Local shuffle has two shapes: - // 1. serial source -> local exchange -> parallel operators -> DataStreamSink - // In this case all fragment instances still send data, so sender count must keep - // `childInstances.size()`. - // 2. serial source -> serial output pipeline -> DataStreamSink - // In this case only the first instance per worker sends data, so sender count must - // collapse to distinct workers. - boolean useLocalShuffle = childInstances.stream() - .anyMatch(LocalShuffleAssignedJob.class::isInstance); - int actualSenderCount; - if (useLocalShuffle && senderFragmentOutputsSerially(childPlan, connectContext)) { - actualSenderCount = (int) childInstances.stream() - .map(AssignedJob::getAssignedWorker) - .distinct() - .count(); - } else { - actualSenderCount = childInstances.size(); - } - - senderNum.merge(exchangeNode.getId().asInt(), actualSenderCount, Integer::sum); + senderNum.merge(exchangeNode.getId().asInt(), childPlan.getInstanceJobs().size(), Integer::sum); } return senderNum; } - private static boolean senderFragmentOutputsSerially( - PipelineDistributedPlan childPlan, ConnectContext connectContext) { - PlanFragment fragment = childPlan.getFragmentJob().getFragment(); - PlanNode planRoot = fragment.getPlanRoot(); - if (!fragment.useSerialSource(connectContext)) { - return false; - } - return planRoot.isSerialOperator() || !planRoot.hasSerialChildren(); - } - private static void setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) { MultiCastDataSink multiCastDataSink = (MultiCastDataSink) fragmentPlan.getFragmentJob().getFragment().getSink(); List<List<TPlanFragmentDestination>> destinationList = multiCastDataSink.getDestinations(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
