This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a4d124e4d3f [fix](nereids) fix union all instance number (#39999) 
(#40100)
a4d124e4d3f is described below

commit a4d124e4d3f2552f06217650c4fddc824c3c6d54
Author: xzj7019 <[email protected]>
AuthorDate: Mon Sep 2 18:11:56 2024 +0800

    [fix](nereids) fix union all instance number (#39999) (#40100)
    
    pick from master #39999
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 43 ++++++-----
 .../query_p0/union/test_union_instance.groovy      | 88 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 20 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 85f4201eb1e..f3b6dabbf46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1657,7 +1657,6 @@ public class Coordinator implements CoordInterface {
             }
 
             Pair<PlanNode, PlanNode> pairNodes = 
findLeftmostNode(fragment.getPlanRoot());
-            PlanNode fatherNode = pairNodes.first;
             PlanNode leftMostNode = pairNodes.second;
 
             /*
@@ -1672,25 +1671,8 @@ public class Coordinator implements CoordInterface {
                 // (Case B)
                 // there is no leftmost scan; we assign the same hosts as 
those of our
                 //  input fragment which has a higher instance_number
-
-                int inputFragmentIndex = 0;
-                int maxParallelism = 0;
-                // If the fragment has three children, then the first child 
and the second child are
-                // the children(both exchange node) of shuffle HashJoinNode,
-                // and the third child is the right child(ExchangeNode) of 
broadcast HashJoinNode.
-                // We only need to pay attention to the maximum parallelism 
among
-                // the two ExchangeNodes of shuffle HashJoinNode.
-                int childrenCount = (fatherNode != null) ? 
fatherNode.getChildren().size() : 1;
-                for (int j = 0; j < childrenCount; j++) {
-                    int currentChildFragmentParallelism
-                            = 
fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size();
-                    if (currentChildFragmentParallelism > maxParallelism) {
-                        maxParallelism = currentChildFragmentParallelism;
-                        inputFragmentIndex = j;
-                    }
-                }
-
-                PlanFragmentId inputFragmentId = 
fragment.getChild(inputFragmentIndex).getFragmentId();
+                int maxParallelFragmentIndex = 
findMaxParallelFragmentIndex(fragment);
+                PlanFragmentId inputFragmentId = 
fragment.getChild(maxParallelFragmentIndex).getFragmentId();
                 // AddAll() soft copy()
                 int exchangeInstances = -1;
                 if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
@@ -1838,6 +1820,27 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    private int findMaxParallelFragmentIndex(PlanFragment fragment) {
+        Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment 
has no children");
+
+        // exclude broadcast join right side's child fragments
+        List<PlanFragment> childFragmentCandidates = 
fragment.getChildren().stream()
+                .filter(e -> e.getOutputPartition() != 
DataPartition.UNPARTITIONED)
+                .collect(Collectors.toList());
+
+        int maxParallelism = 0;
+        int maxParaIndex = 0;
+        for (int i = 0; i < childFragmentCandidates.size(); i++) {
+            PlanFragmentId childFragmentId = 
childFragmentCandidates.get(i).getFragmentId();
+            int currentChildFragmentParallelism = 
fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size();
+            if (currentChildFragmentParallelism > maxParallelism) {
+                maxParallelism = currentChildFragmentParallelism;
+                maxParaIndex = i;
+            }
+        }
+        return maxParaIndex;
+    }
+
     // Traverse the expected runtimeFilterID in each fragment, and establish 
the corresponding relationship
     // between runtimeFilterID and fragment instance addr and select the merge 
instance of runtimeFilter
     private void assignRuntimeFilterAddr() throws Exception {
diff --git a/regression-test/suites/query_p0/union/test_union_instance.groovy 
b/regression-test/suites/query_p0/union/test_union_instance.groovy
new file mode 100644
index 00000000000..6829a48989b
--- /dev/null
+++ b/regression-test/suites/query_p0/union/test_union_instance.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_union_instance") {
+        multi_sql """
+           drop table if exists t1;
+           drop table if exists t2;
+           drop table if exists t3;
+           drop table if exists t4;
+           CREATE TABLE `t1` (
+              `c1` int NULL,
+              `c2` int NULL,
+              `c3` int NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`c1`, `c2`, `c3`)
+            DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1");
+         
+           insert into t1 values (1,1,1);
+           insert into t1 values (2,2,2);
+           insert into t1 values (3,3,3);
+
+            CREATE TABLE `t2` (
+              `year_week` varchar(45) NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`year_week`)
+            DISTRIBUTED BY HASH(`year_week`) BUCKETS 1
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1");
+            
+            CREATE TABLE `t3` (
+              `unique_key` varchar(2999) NULL,
+              `brand_name` varchar(96) NULL,
+              `skc` varchar(150) NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`unique_key`)
+            DISTRIBUTED BY HASH(`unique_key`) BUCKETS 20
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1");
+            
+            CREATE TABLE `t4` (
+              `skc_code` varchar(150) NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`skc_code`)
+            DISTRIBUTED BY HASH(`skc_code`) BUCKETS 1
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1");
+            
+            set parallel_pipeline_task_num=1;
+            set disable_nereids_rules='PRUNE_EMPTY_PARTITION';
+       """
+       explain {
+                sql """
+                SELECT `t`.`year_week` AS `year_week`
+                ,'' AS `brand_name`
+                , '' AS `skc_code`
+                FROM `t1` a
+                CROSS JOIN `t2` t
+                union all
+                SELECT '1' AS `year_week`
+                ,`a`.`brand_name` AS `brand_name`
+                ,`a`.`skc` AS `skc_code`
+                FROM `t3` a
+                INNER JOIN[shuffle] `t4` b ON `a`.`skc` = `b`.`skc_code`
+                GROUP BY 1, 2, 3;
+                """
+               contains "999:VNESTED LOOP JOIN"
+               contains "1005:VEXCHANGE"
+               contains "1015:VEXCHANGE"
+               contains "1025:VEXCHANGE"
+               contains "1040:VUNION"
+        }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to