This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bdd854fdc [fix](nereids) bucket shuffle and colocate join is not 
correctly recognized (#17807)
7bdd854fdc is described below

commit 7bdd854fdcc7136b5668a26b8ba67fc61ba6b576
Author: starocean999 <[email protected]>
AuthorDate: Fri Mar 24 19:21:41 2023 +0800

    [fix](nereids) bucket shuffle and colocate join is not correctly recognized 
(#17807)
    
    1. close (https://github.com/apache/doris/issues/16458) for nereids
    2. varchar and string type should be treated as same type in bucket shuffle 
join scenario.
    ```
    create table shuffle_join_t1 ( a varchar(10) not null )
    create table shuffle_join_t2 ( a varchar(5) not null, b string not null, c 
char(3) not null )
    ```
    the bellow 2 sqls can use bucket shuffle join
    ```
    select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = 
t2.a;
    select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = 
t2.b;
    ```
    3. PushdownExpressionsInHashCondition should consider both hash and other 
conjuncts
    4. visitPhysicalProject should handle MarkJoinSlotReference
---
 .../main/java/org/apache/doris/catalog/Type.java   |   5 +
 .../glue/translator/PhysicalPlanTranslator.java    |  13 ++
 .../properties/ChildrenPropertiesRegulator.java    |   2 +-
 .../nereids/properties/DistributionSpecHash.java   |  38 +++-
 .../LogicalOlapScanToPhysicalOlapScan.java         |   4 +-
 .../PushdownExpressionsInHashCondition.java        |  87 ++++++----
 .../nereids/trees/plans/logical/LogicalJoin.java   |   7 +
 .../trees/plans/logical/LogicalOlapScan.java       |   7 +-
 .../org/apache/doris/nereids/types/StringType.java |   2 +-
 .../apache/doris/nereids/types/VarcharType.java    |   2 +-
 .../org/apache/doris/nereids/util/JoinUtils.java   |   9 +-
 .../doris/nereids/util/TypeCoercionUtils.java      |   8 +-
 .../apache/doris/planner/DistributedPlanner.java   |   6 +-
 .../nereids/postprocess/RuntimeFilterTest.java     |   2 +-
 .../logical/PruneOlapScanPartitionTest.java        |   2 +
 .../rewrite/logical/PruneOlapScanTabletTest.java   |   2 +
 .../PushdownExpressionsInHashConditionTest.java    |  19 --
 .../doris/nereids/types/AbstractDataTypeTest.java  |   4 +-
 .../correctness_p0/test_bucket_shuffle_join.groovy |  46 +++++
 .../nereids_p0/join/bucket_shuffle_join.groovy     |  58 +++++++
 .../join/colocate_join_with_rollup.groovy          | 191 +++++++++++++++++++++
 21 files changed, 438 insertions(+), 76 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java 
b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
index c5767d6c34..8f991c92f3 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
@@ -372,6 +372,11 @@ public abstract class Type {
                 || isScalarType(PrimitiveType.STRING);
     }
 
+    public boolean isVarcharOrStringType() {
+        return isScalarType(PrimitiveType.VARCHAR)
+                || isScalarType(PrimitiveType.STRING);
+    }
+
     public boolean isVarchar() {
         return isScalarType(PrimitiveType.VARCHAR);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index bdc463662b..6f5b71df9f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1366,11 +1366,18 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     // TODO: generate expression mapping when be project could do in ExecNode.
     @Override
     public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> 
project, PlanTranslatorContext context) {
+        MarkJoinSlotReference markJoinSlot = null;
         if (project.child(0) instanceof PhysicalHashJoin) {
             ((PhysicalHashJoin<?, ?>) 
project.child(0)).setShouldTranslateOutput(false);
+            if (((PhysicalHashJoin<?, ?>) 
project.child(0)).getMarkJoinSlotReference().isPresent()) {
+                markJoinSlot = (((PhysicalHashJoin<?, ?>) 
project.child(0)).getMarkJoinSlotReference().get());
+            }
         }
         if (project.child(0) instanceof PhysicalNestedLoopJoin) {
             ((PhysicalNestedLoopJoin<?, ?>) 
project.child(0)).setShouldTranslateOutput(false);
+            if (((PhysicalNestedLoopJoin<?, ?>) 
project.child(0)).getMarkJoinSlotReference().isPresent()) {
+                markJoinSlot = (((PhysicalNestedLoopJoin<?, ?>) 
project.child(0)).getMarkJoinSlotReference().get());
+            }
         }
         if (project.child(0) instanceof PhysicalFilter) {
             if (project.child(0).child(0) instanceof PhysicalHashJoin) {
@@ -1392,6 +1399,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 .stream()
                 .map(e -> e.toSlot())
                 .collect(Collectors.toList());
+
+        if (markJoinSlot != null) {
+            // add mark join slot to output
+            slotList.add(markJoinSlot);
+            execExprList.add(ExpressionTranslator.translate(markJoinSlot, 
context));
+        }
         // For hash join node, use vSrcToOutputSMap to describe the expression 
calculation, use
         // vIntermediateTupleDescList as input, and set vOutputTupleDesc as 
the final output.
         // TODO: HashJoinNode's be implementation is not support projection 
yet, remove this after when supported.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index abfc0f440d..b2343bb501 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -172,7 +172,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Double, Void> {
             
rightShuffleIds.add(rightRequireSpec.getOrderedShuffledColumns().get(index));
         }
         return new PhysicalProperties(new 
DistributionSpecHash(rightShuffleIds, ShuffleType.ENFORCED,
-                rightHashSpec.getTableId(), rightHashSpec.getPartitionIds()));
+                rightHashSpec.getTableId(), 
rightHashSpec.getSelectedIndexId(), rightHashSpec.getPartitionIds()));
     }
 
     private double updateChildEnforceAndCost(GroupExpression child, 
PhysicalProperties childOutput,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
index 72bf7e34df..6a7f899c2a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
@@ -51,6 +51,8 @@ public class DistributionSpecHash extends DistributionSpec {
 
     private final Set<Long> partitionIds;
 
+    private final long selectedIndexId;
+
     // use for satisfied judge
     private final List<Set<ExprId>> equivalenceExprIds;
 
@@ -79,14 +81,23 @@ public class DistributionSpecHash extends DistributionSpec {
     }
 
     /**
-     * Normal constructor.
+     * Used in ut
      */
     public DistributionSpecHash(List<ExprId> orderedShuffledColumns, 
ShuffleType shuffleType,
             long tableId, Set<Long> partitionIds) {
+        this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds);
+    }
+
+    /**
+     * Normal constructor.
+     */
+    public DistributionSpecHash(List<ExprId> orderedShuffledColumns, 
ShuffleType shuffleType,
+            long tableId, long selectedIndexId, Set<Long> partitionIds) {
         this.orderedShuffledColumns = 
Objects.requireNonNull(orderedShuffledColumns);
         this.shuffleType = Objects.requireNonNull(shuffleType);
         this.partitionIds = Objects.requireNonNull(partitionIds);
         this.tableId = tableId;
+        this.selectedIndexId = selectedIndexId;
         equivalenceExprIds = 
Lists.newArrayListWithCapacity(orderedShuffledColumns.size());
         exprIdToEquivalenceSet = 
Maps.newHashMapWithExpectedSize(orderedShuffledColumns.size());
         int i = 0;
@@ -96,15 +107,26 @@ public class DistributionSpecHash extends DistributionSpec 
{
         }
     }
 
+    /**
+     * Used in ut
+     */
+    public DistributionSpecHash(List<ExprId> orderedShuffledColumns, 
ShuffleType shuffleType,
+            long tableId, Set<Long> partitionIds, List<Set<ExprId>> 
equivalenceExprIds,
+            Map<ExprId, Integer> exprIdToEquivalenceSet) {
+        this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds, 
equivalenceExprIds,
+                exprIdToEquivalenceSet);
+    }
+
     /**
      * Used in merge outside and put result into it.
      */
     public DistributionSpecHash(List<ExprId> orderedShuffledColumns, 
ShuffleType shuffleType, long tableId,
-            Set<Long> partitionIds, List<Set<ExprId>> equivalenceExprIds,
+            long selectedIndexId, Set<Long> partitionIds, List<Set<ExprId>> 
equivalenceExprIds,
             Map<ExprId, Integer> exprIdToEquivalenceSet) {
         this.orderedShuffledColumns = 
Objects.requireNonNull(orderedShuffledColumns);
         this.shuffleType = Objects.requireNonNull(shuffleType);
         this.tableId = tableId;
+        this.selectedIndexId = selectedIndexId;
         this.partitionIds = Objects.requireNonNull(partitionIds);
         this.equivalenceExprIds = Objects.requireNonNull(equivalenceExprIds);
         this.exprIdToEquivalenceSet = 
Objects.requireNonNull(exprIdToEquivalenceSet);
@@ -124,7 +146,8 @@ public class DistributionSpecHash extends DistributionSpec {
         exprIdToEquivalenceSet.putAll(left.getExprIdToEquivalenceSet());
         exprIdToEquivalenceSet.putAll(right.getExprIdToEquivalenceSet());
         return new DistributionSpecHash(orderedShuffledColumns, shuffleType,
-                left.getTableId(), left.getPartitionIds(), equivalenceExprIds, 
exprIdToEquivalenceSet);
+                left.getTableId(), left.getSelectedIndexId(), 
left.getPartitionIds(), equivalenceExprIds,
+                exprIdToEquivalenceSet);
     }
 
     static DistributionSpecHash merge(DistributionSpecHash left, 
DistributionSpecHash right) {
@@ -143,6 +166,10 @@ public class DistributionSpecHash extends DistributionSpec 
{
         return tableId;
     }
 
+    public long getSelectedIndexId() {
+        return selectedIndexId;
+    }
+
     public Set<Long> getPartitionIds() {
         return partitionIds;
     }
@@ -219,7 +246,7 @@ public class DistributionSpecHash extends DistributionSpec {
     }
 
     public DistributionSpecHash withShuffleType(ShuffleType shuffleType) {
-        return new DistributionSpecHash(orderedShuffledColumns, shuffleType, 
tableId, partitionIds,
+        return new DistributionSpecHash(orderedShuffledColumns, shuffleType, 
tableId, selectedIndexId, partitionIds,
                 equivalenceExprIds, exprIdToEquivalenceSet);
     }
 
@@ -256,7 +283,7 @@ public class DistributionSpecHash extends DistributionSpec {
                 exprIdToEquivalenceSet.put(exprIdSetKV.getKey(), 
exprIdSetKV.getValue());
             }
         }
-        return new DistributionSpecHash(orderedShuffledColumns, shuffleType, 
tableId, partitionIds,
+        return new DistributionSpecHash(orderedShuffledColumns, shuffleType, 
tableId, selectedIndexId, partitionIds,
                 equivalenceExprIds, exprIdToEquivalenceSet);
     }
 
@@ -280,6 +307,7 @@ public class DistributionSpecHash extends DistributionSpec {
                 "orderedShuffledColumns", orderedShuffledColumns,
                 "shuffleType", shuffleType,
                 "tableId", tableId,
+                "selectedIndexId", selectedIndexId,
                 "partitionIds", partitionIds,
                 "equivalenceExprIds", equivalenceExprIds,
                 "exprIdToEquivalenceSet", exprIdToEquivalenceSet);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index 6ecddb3bda..ea3e4c0530 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -89,8 +89,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
                 }
             }
             // TODO: need to consider colocate and dynamic partition and 
partition number
-            return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL,
-                    olapScan.getTable().getId(), 
Sets.newHashSet(olapScan.getTable().getPartitionIds()));
+            return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL, 
olapScan.getTable().getId(),
+                    olapScan.getSelectedIndexId(), 
Sets.newHashSet(olapScan.getSelectedPartitionIds()));
         } else {
             // RandomDistributionInfo
             return DistributionSpecAny.INSTANCE;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java
index bc92d8f37c..404b1dc22f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashCondition.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
 import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
@@ -29,20 +30,16 @@ import 
org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
-import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * push down expression which is not slot reference
@@ -67,52 +64,66 @@ public class PushdownExpressionsInHashCondition extends 
OneRewriteRuleFactory {
     public Rule build() {
         return logicalJoin()
                 .when(join -> 
join.getHashJoinConjuncts().stream().anyMatch(equalTo ->
-                        equalTo.children().stream().anyMatch(e -> 
!ExpressionUtils.checkTypeSkipCast(e, Slot.class))))
+                        equalTo.children().stream().anyMatch(e -> !(e 
instanceof Slot))))
                 .then(join -> {
-                    List<List<Expression>> exprsOfHashConjuncts =
-                            Lists.newArrayList(Lists.newArrayList(), 
Lists.newArrayList());
-                    Map<Expression, NamedExpression> exprMap = 
Maps.newHashMap();
+                    Set<NamedExpression> leftProjectExprs = Sets.newHashSet();
+                    Set<NamedExpression> rightProjectExprs = Sets.newHashSet();
+                    Map<Expression, NamedExpression> exprReplaceMap = 
Maps.newHashMap();
                     join.getHashJoinConjuncts().forEach(conjunct -> {
                         Preconditions.checkArgument(conjunct instanceof 
EqualTo);
                         // sometimes: t1 join t2 on t2.a + 1 = t1.a + 2, so 
check the situation, but actually it
                         // doesn't swap the two sides.
                         conjunct = JoinUtils.swapEqualToForChildrenOrder(
                                 (EqualTo) conjunct, 
join.left().getOutputSet());
-                        exprsOfHashConjuncts.get(0).add(conjunct.child(0));
-                        exprsOfHashConjuncts.get(1).add(conjunct.child(1));
-                        conjunct.children().forEach(expr -> {
-                            if ((expr instanceof SlotReference)) {
-                                exprMap.put(expr, (SlotReference) expr);
-                            } else {
-                                exprMap.put(expr, new Alias(expr, "expr_" + 
expr.toSql()));
-                            }
-                        });
+                        generateReplaceMapAndProjectExprs(conjunct.child(0), 
exprReplaceMap, leftProjectExprs);
+                        generateReplaceMapAndProjectExprs(conjunct.child(1), 
exprReplaceMap, rightProjectExprs);
                     });
-                    Iterator<List<Expression>> iter = 
exprsOfHashConjuncts.iterator();
+
+                    // add other conjuncts used slots to project exprs
+                    Set<ExprId> leftExprIdSet = 
join.left().getOutputExprIdSet();
+                    join.getOtherJoinConjuncts().stream().flatMap(conjunct ->
+                            conjunct.getInputSlots().stream()
+                    ).forEach(slot -> {
+                        if (leftExprIdSet.contains(slot.getExprId())) {
+                            // belong to left child
+                            leftProjectExprs.add(slot);
+                        } else {
+                            // belong to right child
+                            rightProjectExprs.add(slot);
+                        }
+                    });
+
+                    List<Expression> newHashConjuncts = 
join.getHashJoinConjuncts().stream()
+                            .map(equalTo -> 
equalTo.withChildren(equalTo.children()
+                                    .stream().map(expr -> 
exprReplaceMap.get(expr).toSlot())
+                                    .collect(ImmutableList.toImmutableList())))
+                            .collect(ImmutableList.toImmutableList());
                     return join.withHashJoinConjunctsAndChildren(
-                            join.getHashJoinConjuncts().stream()
-                                    .map(equalTo -> 
equalTo.withChildren(equalTo.children()
-                                            .stream().map(expr -> 
exprMap.get(expr).toSlot())
-                                            
.collect(ImmutableList.toImmutableList())))
-                                    .collect(ImmutableList.toImmutableList()),
-                            join.children().stream().map(
-                                        plan -> {
-                                            Set<NamedExpression> projectSet = 
Sets.newHashSet();
-                                            
projectSet.addAll(iter.next().stream().map(exprMap::get)
-                                                    
.collect(Collectors.toList()));
-                                            projectSet.addAll(getOutput(plan, 
join));
-                                            List<NamedExpression> projectList 
= projectSet.stream()
-                                                    
.collect(ImmutableList.toImmutableList());
-                                            return new 
LogicalProject<>(projectList, plan);
-                                        }
-                                    )
-                                    .collect(ImmutableList.toImmutableList()));
+                            newHashConjuncts,
+                            createChildProjectPlan(join.left(), join, 
leftProjectExprs),
+                            createChildProjectPlan(join.right(), join, 
rightProjectExprs));
                 }).toRule(RuleType.PUSHDOWN_EXPRESSIONS_IN_HASH_CONDITIONS);
     }
 
-    private List<Slot> getOutput(Plan plan, LogicalJoin join) {
-        Set<Slot> intersectionSlots = Sets.newHashSet(plan.getOutputSet());
+    private LogicalProject createChildProjectPlan(Plan plan, LogicalJoin join,
+            Set<NamedExpression> conditionUsedExprs) {
+        Set<NamedExpression> intersectionSlots = 
Sets.newHashSet(plan.getOutputSet());
         intersectionSlots.retainAll(join.getOutputSet());
-        return Lists.newArrayList(intersectionSlots);
+        intersectionSlots.addAll(conditionUsedExprs);
+        return new LogicalProject(intersectionSlots.stream()
+                .collect(ImmutableList.toImmutableList()), plan);
+    }
+
+    private void generateReplaceMapAndProjectExprs(Expression expr, 
Map<Expression, NamedExpression> replaceMap,
+            Set<NamedExpression> projects) {
+        if (expr instanceof SlotReference) {
+            projects.add((SlotReference) expr);
+            replaceMap.put(expr, (SlotReference) expr);
+        } else {
+            Alias alias = new Alias(expr, "expr_" + expr.toSql());
+            if (replaceMap.putIfAbsent(expr, alias.toSlot()) == null) {
+                projects.add(alias);
+            }
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
index 1317e79a9d..319d025e6c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJoin.java
@@ -285,6 +285,13 @@ public class LogicalJoin<LEFT_CHILD_TYPE extends Plan, 
RIGHT_CHILD_TYPE extends
                 children.get(1), joinReorderContext);
     }
 
+    public LogicalJoin<Plan, Plan> withHashJoinConjunctsAndChildren(
+            List<Expression> hashJoinConjuncts, Plan left, Plan right) {
+        Preconditions.checkArgument(children.size() == 2);
+        return new LogicalJoin<>(joinType, hashJoinConjuncts, 
otherJoinConjuncts, hint,
+                markJoinSlotReference, left, right, joinReorderContext);
+    }
+
     public LogicalJoin<Plan, Plan> withConjunctsChildren(List<Expression> 
hashJoinConjuncts,
             List<Expression> otherJoinConjuncts, Plan left, Plan right) {
         return new LogicalJoin<>(joinType, hashJoinConjuncts, 
otherJoinConjuncts, hint, markJoinSlotReference, left,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
index 79a128cd8f..ed7fa920e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -143,14 +144,16 @@ public class LogicalOlapScan extends LogicalRelation 
implements CatalogRelation,
 
         super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier,
                 groupExpression, logicalProperties);
+        Preconditions.checkArgument(selectedPartitionIds != null, 
"selectedPartitionIds can not be null");
         this.selectedTabletIds = ImmutableList.copyOf(selectedTabletIds);
         this.partitionPruned = partitionPruned;
         this.selectedIndexId = selectedIndexId <= 0 ? 
getTable().getBaseIndexId() : selectedIndexId;
         this.indexSelected = indexSelected;
         this.preAggStatus = preAggStatus;
         this.manuallySpecifiedPartitions = ImmutableList.copyOf(partitions);
-        this.selectedPartitionIds = ImmutableList.copyOf(
-                Objects.requireNonNull(selectedPartitionIds, 
"selectedPartitionIds can not be null"));
+        this.selectedPartitionIds = selectedPartitionIds.stream()
+                .filter(partitionId -> 
this.getTable().getPartition(partitionId).hasData()).collect(
+                        Collectors.toList());
         this.hints = Objects.requireNonNull(hints, "hints can not be null");
         this.mvNameToSlot = Objects.requireNonNull(mvNameToSlot, "mvNameToSlot 
can not be null");
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java
index 4cbc869191..8658278e07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/StringType.java
@@ -44,7 +44,7 @@ public class StringType extends CharacterType {
 
     @Override
     public boolean acceptsType(AbstractDataType other) {
-        return other instanceof StringType;
+        return other instanceof StringType || other instanceof VarcharType;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java
index f7a7478de9..305daff8bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VarcharType.java
@@ -54,7 +54,7 @@ public class VarcharType extends CharacterType {
 
     @Override
     public boolean acceptsType(AbstractDataType other) {
-        return other instanceof VarcharType;
+        return other instanceof VarcharType || other instanceof StringType;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index 662c7838e1..ea7b435c96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -317,8 +317,13 @@ public class JoinUtils {
         final long rightTableId = rightHashSpec.getTableId();
         final Set<Long> leftTablePartitions = leftHashSpec.getPartitionIds();
         final Set<Long> rightTablePartitions = rightHashSpec.getPartitionIds();
-        boolean noNeedCheckColocateGroup = (leftTableId == rightTableId)
-                && (leftTablePartitions.equals(rightTablePartitions)) && 
(leftTablePartitions.size() <= 1);
+
+        // For UT or no partition is selected, getSelectedIndexId() == -1, see 
selectMaterializedView()
+        boolean hitSameIndex = (leftTableId == rightTableId)
+                && (leftHashSpec.getSelectedIndexId() != -1 && 
rightHashSpec.getSelectedIndexId() != -1)
+                && (leftHashSpec.getSelectedIndexId() == 
rightHashSpec.getSelectedIndexId());
+        boolean noNeedCheckColocateGroup = hitSameIndex && 
(leftTablePartitions.equals(rightTablePartitions))
+                && (leftTablePartitions.size() <= 1);
         ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
         if (noNeedCheckColocateGroup
                 || (colocateIndex.isSameGroup(leftTableId, rightTableId)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java
index ea443bc610..d499fe87f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/TypeCoercionUtils.java
@@ -227,7 +227,9 @@ public class TypeCoercionUtils {
      * cast input type if input's datatype is not same with dateType.
      */
     public static Expression castIfNotSameType(Expression input, DataType 
targetType) {
-        if (input.getDataType().equals(targetType) || 
isSubqueryAndDataTypeIsBitmap(input)) {
+        if (input.getDataType().equals(targetType) || 
isSubqueryAndDataTypeIsBitmap(input)
+                || (isVarCharOrStringType(input.getDataType())
+                        && isVarCharOrStringType(targetType))) {
             return input;
         } else {
             checkCanCastTo(input.getDataType(), targetType);
@@ -239,6 +241,10 @@ public class TypeCoercionUtils {
         return input instanceof SubqueryExpr && 
input.getDataType().isBitmapType();
     }
 
+    private static boolean isVarCharOrStringType(DataType dataType) {
+        return dataType instanceof VarcharType || dataType instanceof 
StringType;
+    }
+
     private static boolean canCastTo(DataType input, DataType target) {
         return Type.canCastTo(input.toCatalogDataType(), 
target.toCatalogDataType());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 5999f33406..203146164a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -688,7 +688,11 @@ public class DistributedPlanner {
                 // check the rhs join expr type is same as distribute column
                 for (int j = 0; j < leftJoinColumnNames.size(); j++) {
                     if 
(leftJoinColumnNames.get(j).equals(distributeColumnName)) {
-                        if 
(rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) {
+                        // varchar and string type don't need to check the 
length property
+                        if 
((rightExprs.get(j).getType().isVarcharOrStringType()
+                                && 
leftDistributeColumns.get(i).getType().isVarcharOrStringType())
+                                || (rightExprs.get(j).getType()
+                                        
.equals(leftDistributeColumns.get(i).getType()))) {
                             rhsJoinExprs.add(rightExprs.get(j));
                             findRhsExprs = true;
                             break;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index 99e9f3842f..a551fe8ba0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -229,7 +229,7 @@ public class RuntimeFilterTest extends SSBTestBase {
         List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
         Assertions.assertEquals(1, filters.size());
         checkRuntimeFilterExprs(filters, ImmutableList.of(
-                Pair.of("cast(s_name as VARCHAR(*))", "p_name")));
+                Pair.of("s_name", "p_name")));
     }
 
     private Optional<List<RuntimeFilter>> getRuntimeFilters(String sql) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java
index da971b0a45..a87de475b8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanPartitionTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.rules.rewrite.logical;
 
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.nereids.util.MemoPatternMatchSupported;
 import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.utframe.TestWithFeService;
@@ -130,6 +131,7 @@ class PruneOlapScanPartitionTest extends TestWithFeService 
implements MemoPatter
                 notNullSingleColumnPartitionTable,
                 multipleColumnsPartitionTable,
                 notNullMultipleColumnsPartitionTable);
+        FeConstants.runningUnitTest = true;
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java
index 50aff4011c..68351e6a9f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PruneOlapScanTabletTest.java
@@ -113,6 +113,8 @@ class PruneOlapScanTabletTest implements 
MemoPatternMatchSupported {
                 result = "t1";
                 olapTable.getPartition(anyLong);
                 result = partition;
+                partition.hasData();
+                result = true;
                 partition.getIndex(anyLong);
                 result = index;
                 partition.getDistributionInfo();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java
index 61c4365db1..6f6adc7050 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownExpressionsInHashConditionTest.java
@@ -20,7 +20,6 @@ package org.apache.doris.nereids.rules.rewrite.logical;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.properties.PhysicalProperties;
-import org.apache.doris.nereids.trees.expressions.Cast;
 import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
 import org.apache.doris.nereids.util.MemoPatternMatchSupported;
 import org.apache.doris.nereids.util.PlanChecker;
@@ -30,7 +29,6 @@ import com.google.common.collect.ImmutableList;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
-import java.util.Set;
 
 public class PushdownExpressionsInHashConditionTest extends TestWithFeService 
implements MemoPatternMatchSupported {
     @Override
@@ -182,21 +180,4 @@ public class PushdownExpressionsInHashConditionTest 
extends TestWithFeService im
                     )
                 );
     }
-
-    @Test
-    public void testNotPushDownWhenCast() {
-        PlanChecker.from(connectContext)
-                .analyze("SELECT * FROM T1 JOIN T2 ON T1.SCORE_INT = T2.SCORE")
-                .applyTopDown(new FindHashConditionForJoin())
-                .applyTopDown(new PushdownExpressionsInHashCondition())
-                .matchesFromRoot(
-                        logicalProject(
-                                logicalJoin(
-                                        logicalOlapScan(),
-                                        logicalOlapScan()
-                                ).when(join -> 
!join.getHashJoinConjuncts().get(0)
-                                        
.<Set>collect(Cast.class::isInstance).isEmpty())
-                        )
-                );
-    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java
index b852bf5c9b..776b80ec47 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/types/AbstractDataTypeTest.java
@@ -375,7 +375,7 @@ public class AbstractDataTypeTest {
         Assertions.assertFalse(dataType.acceptsType(new 
DecimalV2Type(precision, scale)));
         Assertions.assertFalse(dataType.acceptsType(new CharType(new 
Random().nextInt())));
         Assertions.assertTrue(dataType.acceptsType(new VarcharType(new 
Random().nextInt())));
-        Assertions.assertFalse(dataType.acceptsType(StringType.INSTANCE));
+        Assertions.assertTrue(dataType.acceptsType(StringType.INSTANCE));
         Assertions.assertFalse(dataType.acceptsType(DateType.INSTANCE));
         Assertions.assertFalse(dataType.acceptsType(DateTimeType.INSTANCE));
     }
@@ -396,7 +396,7 @@ public class AbstractDataTypeTest {
         int scale = Math.min(precision, Math.abs(new Random().nextInt() % 
DecimalV2Type.MAX_SCALE));
         Assertions.assertFalse(dataType.acceptsType(new 
DecimalV2Type(precision, scale)));
         Assertions.assertFalse(dataType.acceptsType(new CharType(new 
Random().nextInt())));
-        Assertions.assertFalse(dataType.acceptsType(new VarcharType(new 
Random().nextInt())));
+        Assertions.assertTrue(dataType.acceptsType(new VarcharType(new 
Random().nextInt())));
         Assertions.assertTrue(dataType.acceptsType(StringType.INSTANCE));
         Assertions.assertFalse(dataType.acceptsType(DateType.INSTANCE));
         Assertions.assertFalse(dataType.acceptsType(DateTimeType.INSTANCE));
diff --git 
a/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy 
b/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy
index 429d4f18ad..3c172d82e9 100644
--- a/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy
+++ b/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy
@@ -91,4 +91,50 @@ suite("test_bucket_shuffle_join") {
         contains "4:VHASH JOIN\n  |  join op: INNER JOIN(BUCKET_SHUFFLE)"
         contains "2:VHASH JOIN\n  |  join op: INNER JOIN(BUCKET_SHUFFLE)"
     }
+
+    sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
+    sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
+
+    sql """
+        create table shuffle_join_t1 ( a varchar(10) not null )
+        ENGINE=OLAP
+        DISTRIBUTED BY HASH(a) BUCKETS 5
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2"
+        );
+    """
+
+    sql """
+        create table shuffle_join_t2 ( a varchar(5) not null, b string not 
null, c char(3) not null )
+        ENGINE=OLAP
+        DISTRIBUTED BY HASH(a) BUCKETS 5
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2"
+        );
+    """
+
+    sql """insert into shuffle_join_t1 values("1");"""
+    sql """insert into shuffle_join_t2 values("1","1","1");"""
+
+    explain {
+        sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on 
t1.a = t2.a;")
+        contains "BUCKET_SHUFFLE"
+    }
+
+    explain {
+        sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on 
t1.a = t2.b;")
+        contains "BUCKET_SHUFFLE"
+    }
+
+    explain {
+        sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on 
t1.a = t2.c;")
+        notContains "BUCKET_SHUFFLE"
+    }
+
+    sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
+    sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
 }
diff --git a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy 
b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy
index 62408acd8f..5a48d2a99f 100644
--- a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy
+++ b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy
@@ -21,4 +21,62 @@ suite("bucket-shuffle-join") {
     order_qt_test_bucket """
     select * from test_bucket_shuffle_join where rectime="2021-12-01 00:00:00" 
and id in (select k1 from test_join where k1 in (1,2))
     """
+
+    sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
+    sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
+
+    sql """
+        create table shuffle_join_t1 ( a varchar(10) not null )
+        ENGINE=OLAP
+        DISTRIBUTED BY HASH(a) BUCKETS 5
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2"
+        );
+    """
+
+    sql """
+        create table shuffle_join_t2 ( a varchar(5) not null, b string not 
null, c char(3) not null )
+        ENGINE=OLAP
+        DISTRIBUTED BY HASH(a) BUCKETS 5
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2"
+        );
+    """
+
+    sql """insert into shuffle_join_t1 values("1");"""
+    sql """insert into shuffle_join_t1 values("1");"""
+    sql """insert into shuffle_join_t1 values("1");"""
+    sql """insert into shuffle_join_t1 values("1");"""
+    sql """insert into shuffle_join_t2 values("1","1","1");"""
+    sql """insert into shuffle_join_t2 values("1","1","1");"""
+    sql """insert into shuffle_join_t2 values("1","1","1");"""
+    sql """insert into shuffle_join_t2 values("1","1","1");"""
+
+    sql """analyze table shuffle_join_t1;"""
+    sql """analyze table shuffle_join_t2;"""
+
+    Thread.sleep(2000)
+
+    explain {
+        sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on 
t1.a = t2.a;")
+        contains "BUCKET_SHUFFLE"
+    }
+
+    explain {
+        sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on 
t1.a = t2.b;")
+        contains "BUCKET_SHUFFLE"
+    }
+
+    explain {
+        sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on 
t1.a = t2.c;")
+        contains "BUCKET_SHUFFLE"
+        contains "BUCKET_SHFFULE_HASH_PARTITIONED: expr_cast(c as VARCHAR(*))"
+    }
+
+    sql """ DROP TABLE IF EXISTS shuffle_join_t1 """
+    sql """ DROP TABLE IF EXISTS shuffle_join_t2 """
 }
diff --git 
a/regression-test/suites/nereids_p0/join/colocate_join_with_rollup.groovy 
b/regression-test/suites/nereids_p0/join/colocate_join_with_rollup.groovy
new file mode 100644
index 0000000000..73ffd09b4f
--- /dev/null
+++ b/regression-test/suites/nereids_p0/join/colocate_join_with_rollup.groovy
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("colocate_join_with_rollup", "nereids_p0") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false" 
+
+    sql """ DROP TABLE IF EXISTS test_query_colocate1 """
+    sql """ DROP TABLE IF EXISTS test_query_colocate2 """
+    sql """ DROP TABLE IF EXISTS test_query_no_colocate """
+
+    sql """
+        CREATE TABLE `test_query_colocate1` (
+              `datekey` int(11) NULL,
+              `rollup_1_condition` int null,
+              `rollup_2_condition` int null,
+              `sum_col1` bigint(20) SUM NULL,
+              `sum_col2` bigint(20) SUM NULL
+            ) ENGINE=OLAP
+            AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`)
+            COMMENT ""
+            PARTITION BY RANGE(`datekey`)
+            (PARTITION p20220102 VALUES [("20220101"), ("20220102")),
+            PARTITION p20220103 VALUES [("20220102"), ("20220103")))
+            DISTRIBUTED BY HASH(`datekey`) BUCKETS 1
+            rollup (
+            rollup_1(datekey, sum_col1),
+            rollup_2(datekey, sum_col2)
+            )
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "in_memory" = "false",
+                "storage_format" = "V2",
+                "colocate_with" = "group1"
+            );
+    """
+
+    sql """
+        CREATE TABLE `test_query_colocate2` (
+              `datekey` int(11) NULL,
+              `rollup_1_condition` int null,
+              `rollup_2_condition` int null,
+              `sum_col1` bigint(20) SUM NULL,
+              `sum_col2` bigint(20) SUM NULL
+            ) ENGINE=OLAP
+            AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`)
+            COMMENT ""
+            PARTITION BY RANGE(`datekey`)
+            (PARTITION p20220102 VALUES [("20220101"), ("20220102")),
+            PARTITION p20220103 VALUES [("20220102"), ("20220103")))
+            DISTRIBUTED BY HASH(`datekey`) BUCKETS 1
+            rollup (
+            rollup_1(datekey, sum_col1),
+            rollup_2(datekey, sum_col2)
+            )
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "in_memory" = "false",
+                "storage_format" = "V2",
+                "colocate_with" = "group1"
+            );
+    """
+
+    sql """
+        CREATE TABLE `test_query_no_colocate` (
+              `datekey` int(11) NULL,
+              `rollup_1_condition` int null,
+              `rollup_2_condition` int null,
+              `sum_col1` bigint(20) SUM NULL,
+              `sum_col2` bigint(20) SUM NULL
+            ) ENGINE=OLAP
+            AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`)
+            COMMENT ""
+            PARTITION BY RANGE(`datekey`)
+            (PARTITION p20220102 VALUES [("20220101"), ("20220110")),
+            PARTITION p20220103 VALUES [("20220110"), ("20220120")))
+            DISTRIBUTED BY HASH(`datekey`) BUCKETS 5
+            rollup (
+            rollup_1(datekey, sum_col1),
+            rollup_2(datekey, sum_col2)
+            )
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1",
+                "in_memory" = "false",
+                "storage_format" = "V2"
+            );
+    """
+
+    sql """insert into test_query_colocate1 values
+            (20220101, 102, 200, 200, 100),
+            (20220101, 101, 200, 200, 100),
+            (20220101, 102, 202, 200, 100),
+            (20220101, 101, 202, 200, 100);"""
+
+    sql """insert into test_query_colocate2 values
+            (20220101, 102, 200, 200, 100),
+            (20220101, 101, 200, 200, 100),
+            (20220101, 102, 202, 200, 100),
+            (20220101, 101, 202, 200, 100);"""
+
+    sql """insert into test_query_no_colocate values
+            (20220101, 102, 200, 200, 100),
+            (20220102, 101, 200, 200, 100),
+            (20220103, 102, 202, 200, 100),
+            (20220104, 101, 202, 200, 100),
+            (20220105, 102, 200, 200, 100),
+            (20220106, 101, 200, 200, 100),
+            (20220107, 102, 202, 200, 100),
+            (20220108, 101, 202, 200, 100);"""
+
+    explain {
+        sql("""select sum_col1,sum_col2 
+            from 
+            (select datekey,sum(sum_col1) as sum_col1 from 
test_query_colocate1 where datekey=20220101 group by datekey) t1
+            left join
+            (select datekey,sum(sum_col2) as sum_col2 from 
test_query_colocate1 where datekey=20220101 group by datekey) t2
+            on t1.datekey = t2.datekey""")
+        contains "COLOCATE"
+    }
+
+    explain {
+        sql("""select sum_col1,sum_col2 
+            from 
+            (select datekey,sum(sum_col1) as sum_col1 from 
test_query_colocate1 where datekey=20220101 group by datekey) t1
+            left join
+            (select datekey,sum(sum_col1) as sum_col2 from 
test_query_colocate2 where datekey=20220101 group by datekey) t2
+            on t1.datekey = t2.datekey""")
+        contains "COLOCATE"
+    }
+
+    explain {
+        sql("""select sum_col1,sum_col2 
+            from 
+            (select datekey,sum(sum_col1) as sum_col1 from 
test_query_colocate1 where datekey=20220101 group by datekey) t1
+            left join
+            (select datekey,sum(sum_col2) as sum_col2 from 
test_query_colocate2 where datekey=20220101 group by datekey) t2
+            on t1.datekey = t2.datekey""")
+        contains "COLOCATE"
+    }
+
+    explain {
+        // hit same rollup, colocate
+        sql("""select sum_col1,sum_col2 
+            from 
+            (select datekey,sum(sum_col1) as sum_col1 from 
test_query_no_colocate group by datekey) t1
+            left join
+            (select datekey,sum(sum_col1) as sum_col2 from 
test_query_no_colocate group by datekey) t2
+            on t1.datekey = t2.datekey""")
+        contains "COLOCATE"
+    }
+
+    explain {
+        // hit same base table, colocate
+        sql("""select * 
+            from 
+            (select datekey from test_query_no_colocate ) t1
+            left join
+            (select datekey from test_query_no_colocate ) t2
+            on t1.datekey = t2.datekey""")
+        contains "COLOCATE"
+    }
+
+    explain {
+        // hit diffrent rollup, no colocate
+        sql("""select sum_col1,sum_col2 
+                from 
+                (select datekey,sum(sum_col1) as sum_col1 from 
test_query_no_colocate group by datekey) t1
+                left join
+                (select datekey,sum(sum_col2) as sum_col2 from 
test_query_no_colocate group by datekey) t2
+                on t1.datekey = t2.datekey""")
+        notContains "COLOCATE"
+    }
+
+    sql """ DROP TABLE IF EXISTS test_query_colocate1 """
+    sql """ DROP TABLE IF EXISTS test_query_colocate2 """
+    sql """ DROP TABLE IF EXISTS test_query_no_colocate """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to