This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a4d124e4d3f [fix](nereids) fix union all instance number (#39999)
(#40100)
a4d124e4d3f is described below
commit a4d124e4d3f2552f06217650c4fddc824c3c6d54
Author: xzj7019 <[email protected]>
AuthorDate: Mon Sep 2 18:11:56 2024 +0800
[fix](nereids) fix union all instance number (#39999) (#40100)
pick from master #39999
---
.../main/java/org/apache/doris/qe/Coordinator.java | 43 ++++++-----
.../query_p0/union/test_union_instance.groovy | 88 ++++++++++++++++++++++
2 files changed, 111 insertions(+), 20 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 85f4201eb1e..f3b6dabbf46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1657,7 +1657,6 @@ public class Coordinator implements CoordInterface {
}
Pair<PlanNode, PlanNode> pairNodes =
findLeftmostNode(fragment.getPlanRoot());
- PlanNode fatherNode = pairNodes.first;
PlanNode leftMostNode = pairNodes.second;
/*
@@ -1672,25 +1671,8 @@ public class Coordinator implements CoordInterface {
// (Case B)
// there is no leftmost scan; we assign the same hosts as
those of our
// input fragment which has a higher instance_number
-
- int inputFragmentIndex = 0;
- int maxParallelism = 0;
- // If the fragment has three children, then the first child
and the second child are
- // the children(both exchange node) of shuffle HashJoinNode,
- // and the third child is the right child(ExchangeNode) of
broadcast HashJoinNode.
- // We only need to pay attention to the maximum parallelism
among
- // the two ExchangeNodes of shuffle HashJoinNode.
- int childrenCount = (fatherNode != null) ?
fatherNode.getChildren().size() : 1;
- for (int j = 0; j < childrenCount; j++) {
- int currentChildFragmentParallelism
- =
fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size();
- if (currentChildFragmentParallelism > maxParallelism) {
- maxParallelism = currentChildFragmentParallelism;
- inputFragmentIndex = j;
- }
- }
-
- PlanFragmentId inputFragmentId =
fragment.getChild(inputFragmentIndex).getFragmentId();
+ int maxParallelFragmentIndex =
findMaxParallelFragmentIndex(fragment);
+ PlanFragmentId inputFragmentId =
fragment.getChild(maxParallelFragmentIndex).getFragmentId();
// AddAll() soft copy()
int exchangeInstances = -1;
if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable() != null) {
@@ -1838,6 +1820,27 @@ public class Coordinator implements CoordInterface {
}
}
+ private int findMaxParallelFragmentIndex(PlanFragment fragment) {
+ Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment
has no children");
+
+ // exclude broadcast join right side's child fragments
+ List<PlanFragment> childFragmentCandidates =
fragment.getChildren().stream()
+ .filter(e -> e.getOutputPartition() !=
DataPartition.UNPARTITIONED)
+ .collect(Collectors.toList());
+
+ int maxParallelism = 0;
+ int maxParaIndex = 0;
+ for (int i = 0; i < childFragmentCandidates.size(); i++) {
+ PlanFragmentId childFragmentId =
childFragmentCandidates.get(i).getFragmentId();
+ int currentChildFragmentParallelism =
fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size();
+ if (currentChildFragmentParallelism > maxParallelism) {
+ maxParallelism = currentChildFragmentParallelism;
+ maxParaIndex = i;
+ }
+ }
+ return maxParaIndex;
+ }
+
// Traverse the expected runtimeFilterID in each fragment, and establish
the corresponding relationship
// between runtimeFilterID and fragment instance addr and select the merge
instance of runtimeFilter
private void assignRuntimeFilterAddr() throws Exception {
diff --git a/regression-test/suites/query_p0/union/test_union_instance.groovy
b/regression-test/suites/query_p0/union/test_union_instance.groovy
new file mode 100644
index 00000000000..6829a48989b
--- /dev/null
+++ b/regression-test/suites/query_p0/union/test_union_instance.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_union_instance") {
+ multi_sql """
+ drop table if exists t1;
+ drop table if exists t2;
+ drop table if exists t3;
+ drop table if exists t4;
+ CREATE TABLE `t1` (
+ `c1` int NULL,
+ `c2` int NULL,
+ `c3` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`, `c2`, `c3`)
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1");
+
+ insert into t1 values (1,1,1);
+ insert into t1 values (2,2,2);
+ insert into t1 values (3,3,3);
+
+ CREATE TABLE `t2` (
+ `year_week` varchar(45) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`year_week`)
+ DISTRIBUTED BY HASH(`year_week`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1");
+
+ CREATE TABLE `t3` (
+ `unique_key` varchar(2999) NULL,
+ `brand_name` varchar(96) NULL,
+ `skc` varchar(150) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`unique_key`)
+ DISTRIBUTED BY HASH(`unique_key`) BUCKETS 20
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1");
+
+ CREATE TABLE `t4` (
+ `skc_code` varchar(150) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`skc_code`)
+ DISTRIBUTED BY HASH(`skc_code`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1");
+
+ set parallel_pipeline_task_num=1;
+ set disable_nereids_rules='PRUNE_EMPTY_PARTITION';
+ """
+ explain {
+ sql """
+ SELECT `t`.`year_week` AS `year_week`
+ ,'' AS `brand_name`
+ , '' AS `skc_code`
+ FROM `t1` a
+ CROSS JOIN `t2` t
+ union all
+ SELECT '1' AS `year_week`
+ ,`a`.`brand_name` AS `brand_name`
+ ,`a`.`skc` AS `skc_code`
+ FROM `t3` a
+ INNER JOIN[shuffle] `t4` b ON `a`.`skc` = `b`.`skc_code`
+ GROUP BY 1, 2, 3;
+ """
+ contains "999:VNESTED LOOP JOIN"
+ contains "1005:VEXCHANGE"
+ contains "1015:VEXCHANGE"
+ contains "1025:VEXCHANGE"
+ contains "1040:VUNION"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]