This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 443b79d6ba0dd0115f6eb31ac22447956f34bc91 Author: Gabriel <[email protected]> AuthorDate: Wed Jan 10 14:49:46 2024 +0800 [pipelineX](bug) Fix correctness problem using multiple BE (#29765) --- .../src/main/java/org/apache/doris/planner/ScanNode.java | 4 +++- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index d21fab7ac9a..885bb335e49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -724,7 +724,9 @@ public abstract class ScanNode extends PlanNode { return !isKeySearch() && context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() - && !fragment.isHasNullAwareLeftAntiJoin(); + && !fragment.isHasNullAwareLeftAntiJoin() + && ((this instanceof OlapScanNode) && ((OlapScanNode) this).getScanTabletIds().size() + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum()); } public boolean haveLimitAndConjunts() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9367316b201..efee918dfde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2044,9 +2044,10 @@ public class Coordinator implements CoordInterface { boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); + boolean ignoreStorageDataDistribution = scanNodes.stream() + .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids; if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) - || (node.get().ignoreStorageDataDistribution(context) - && expectedInstanceNum > perNodeScanRanges.size() && useNereids))) { + || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and conjunts, only need 1 instance to save cpu and // mem resource @@ -2906,9 +2907,8 @@ public class Coordinator implements CoordInterface { * 2. `parallelExecInstanceNum` is larger than scan ranges. * 3. Use Nereids planner. */ - boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> { - return scanNodeIds.contains(scanNode.getId().asInt()); - }).allMatch(node -> node.ignoreStorageDataDistribution(context)) + boolean ignoreStorageDataDistribution = scanNodes.stream() + .allMatch(node -> node.ignoreStorageDataDistribution(context)) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
