This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 94c71fa6cb173f3acb4dbf4b0fcc16da9ba22ce2
Author: EmmyMiao87 <[email protected]>
AuthorDate: Sun Apr 3 10:19:39 2022 +0800

    [fix](colocate) Fix the error colocate plan when query is (rollup + 
instance >1) (#8779)
    
    The Repeat Node will change the fragment data partition.
    
    So the output partition of child fragment is different from the data 
partition of current fragment.
    When judging whether colocate can be enabled,
    the current data partition of fragment should be used directly instead of 
the child's output partition.
    
    Before this PR fix, queries with '''rollup + concurrency greater than 1''' 
may have incorrect results.
    For example:
    ```
    select t1.tc1,t1.tc2,sum(t1.tc3) as total from t1 join[shuffle] t1 t2 on 
t1.tc1=t2.tc1
    group by rollup(tc1,tc2) order by t1.tc1,t1.tc2,total;
    ```
    
    Fixed #8778
---
 .../org/apache/doris/planner/DistributedPlanner.java  | 10 ++++------
 .../java/org/apache/doris/planner/PlanFragment.java   |  9 ---------
 .../org/apache/doris/planner/ColocatePlanTest.java    | 19 +++++++++++++++++++
 3 files changed, 23 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index eb099c26ec..90dda46638 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -927,7 +927,7 @@ public class DistributedPlanner {
         if (isDistinct) {
             return createPhase2DistinctAggregationFragment(node, 
childFragment, fragments);
         } else {
-            if (canColocateAgg(node.getAggInfo(), 
childFragment.getInputDataPartition())) {
+            if (canColocateAgg(node.getAggInfo(), 
childFragment.getDataPartition())) {
                 childFragment.addPlanRoot(node);
                 childFragment.setHasColocatePlanNode(true);
                 return childFragment;
@@ -942,7 +942,7 @@ public class DistributedPlanner {
      * 1. Session variables disable_colocate_plan = false
      * 2. The input data partition of child fragment < agg node partition exprs
      */
-    private boolean canColocateAgg(AggregateInfo aggregateInfo, 
List<DataPartition> childFragmentDataPartition) {
+    private boolean canColocateAgg(AggregateInfo aggregateInfo, DataPartition 
childFragmentDataPartition) {
         // Condition1
         if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) 
{
             LOG.debug("Agg node is not colocate in:" + 
ConnectContext.get().queryId()
@@ -952,10 +952,8 @@ public class DistributedPlanner {
 
         // Condition2
         List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
-        for (DataPartition childDataPartition : childFragmentDataPartition) {
-            if (dataPartitionMatchAggInfo(childDataPartition, 
aggPartitionExprs)) {
-                return true;
-            }
+        if (dataPartitionMatchAggInfo(childFragmentDataPartition, 
aggPartitionExprs)) {
+            return true;
         }
         return false;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index 6f14e44067..c5d01e6249 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -329,15 +329,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         dest.addChild(this);
     }
 
-    public List<DataPartition> getInputDataPartition() {
-        List<DataPartition> result = Lists.newArrayList();
-        result.add(getDataPartition());
-        for (PlanFragment child : children) {
-            result.add(child.getOutputPartition());
-        }
-        return result;
-    }
-
     public DataPartition getDataPartition() {
         return dataPartition;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
index 43d062a806..716e3c68be 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
@@ -186,4 +186,23 @@ public class ColocatePlanTest {
         Assert.assertTrue(isColocateFragment1);
     }
 
+    // Fix #8778
+    @Test
+    public void rollupAndMoreThanOneInstanceWithoutColocate() throws Exception 
{
+        String createColocateTblStmtStr = "create table 
db1.test_colocate_one_backend(k1 int, k2 int, k3 int, k4 int) "
+                + "distributed by hash(k1, k2, k3) buckets 10 
properties('replication_num' = '1');";
+        CreateTableStmt createColocateTableStmt = (CreateTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx);
+        Catalog.getCurrentCatalog().createTable(createColocateTableStmt);
+
+        String sql = "select a.k1, a.k2, sum(a.k3) "
+                + "from db1.test_colocate_one_backend a join[shuffle] 
db1.test_colocate_one_backend b on a.k1=b.k1 "
+                + "group by rollup(a.k1, a.k2);";
+        Deencapsulation.setField(ctx.getSessionVariable(), 
"parallelExecInstanceNum", 2);
+        String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+        Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
+        Assert.assertEquals(5, StringUtils.countMatches(plan1, "PLAN 
FRAGMENT"));
+
+    }
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to