morrySnow commented on code in PR #61248:
URL: https://github.com/apache/doris/pull/61248#discussion_r3264087684


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -130,6 +144,103 @@ boolean shouldUseMultiDistinct(LogicalAggregate<? extends 
Plan> aggregate) {
                 && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
+    private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {

Review Comment:
   should add ut for this function



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -130,6 +144,103 @@ boolean shouldUseMultiDistinct(LogicalAggregate<? extends 
Plan> aggregate) {
                 && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
+    private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {
+        DistinctDistributionInfo info = 
resolveDistinctDistributionInfo(aggregate);
+        if (info == null) {
+            return false;
+        }
+        Set<String> distinctColumnNames = new HashSet<>();
+        for (SlotReference slot : info.distinctSlots) {
+            if (!slot.getOriginalColumn().isPresent()) {
+                return false;
+            }
+            
distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+        }
+        DistributionInfo distributionInfo = 
info.table.getDefaultDistributionInfo();
+        if (!(distributionInfo instanceof HashDistributionInfo)) {
+            return false;
+        }
+        List<Column> distributionColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
+        if (distributionColumns.isEmpty()) {
+            return false;
+        }
+        for (Column column : distributionColumns) {
+            if (!distinctColumnNames.contains(column.getName().toLowerCase())) 
{
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private DistinctDistributionInfo 
resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) {

Review Comment:
   add comment to explain what situation it can handle



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -130,6 +144,103 @@ boolean shouldUseMultiDistinct(LogicalAggregate<? extends 
Plan> aggregate) {
                 && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
+    private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {
+        DistinctDistributionInfo info = 
resolveDistinctDistributionInfo(aggregate);
+        if (info == null) {
+            return false;
+        }
+        Set<String> distinctColumnNames = new HashSet<>();
+        for (SlotReference slot : info.distinctSlots) {
+            if (!slot.getOriginalColumn().isPresent()) {
+                return false;
+            }
+            
distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+        }
+        DistributionInfo distributionInfo = 
info.table.getDefaultDistributionInfo();
+        if (!(distributionInfo instanceof HashDistributionInfo)) {
+            return false;
+        }
+        List<Column> distributionColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
+        if (distributionColumns.isEmpty()) {
+            return false;
+        }
+        for (Column column : distributionColumns) {
+            if (!distinctColumnNames.contains(column.getName().toLowerCase())) 
{
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private DistinctDistributionInfo 
resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) {
+        Set<Expression> distinctArgs = aggregate.getDistinctArguments();
+        if (distinctArgs.isEmpty()) {
+            return null;
+        }
+        Set<SlotReference> distinctSlots = new HashSet<>();
+        for (Expression expression : distinctArgs) {
+            if (!(expression instanceof SlotReference)) {
+                return null;
+            }
+            distinctSlots.add((SlotReference) expression);
+        }
+        Plan child = aggregate.child();
+        while (child instanceof LogicalProject || child instanceof 
LogicalFilter) {
+            if (child instanceof LogicalProject) {
+                LogicalProject<? extends Plan> project = (LogicalProject<? 
extends Plan>) child;
+                Map<Slot, Expression> projectExprMap = new HashMap<>();
+                for (NamedExpression namedExpression : project.getProjects()) {
+                    Expression projectExpr = namedExpression;
+                    if (namedExpression instanceof Alias) {
+                        projectExpr = ((Alias) namedExpression).child();
+                    }
+                    projectExprMap.put(namedExpression.toSlot(), projectExpr);
+                }
+                Set<SlotReference> replaced = new HashSet<>();
+                for (SlotReference slot : distinctSlots) {
+                    Expression projectExpr = projectExprMap.get(slot);
+                    if (!(projectExpr instanceof SlotReference)) {
+                        return null;
+                    }
+                    replaced.add((SlotReference) projectExpr);
+                }
+                distinctSlots = replaced;
+                child = project.child();
+                continue;
+            }
+            child = ((LogicalFilter<? extends Plan>) child).child();
+        }
+        if (!(child instanceof LogicalOlapScan)) {
+            return null;
+        }
+        LogicalOlapScan scan = (LogicalOlapScan) child;
+        OlapTable olapTable = scan.getTable();
+        if (olapTable == null) {
+            return null;
+        }
+        if (olapTable.getPartitionInfo().getType() != 
PartitionType.UNPARTITIONED
+                && scan.getSelectedPartitionIds().size() > 1) {
+            return null;

Review Comment:
   colocate table with multi partition also satisfy distribution



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -130,6 +144,103 @@ boolean shouldUseMultiDistinct(LogicalAggregate<? extends 
Plan> aggregate) {
                 && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
+    private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {
+        DistinctDistributionInfo info = 
resolveDistinctDistributionInfo(aggregate);
+        if (info == null) {
+            return false;
+        }
+        Set<String> distinctColumnNames = new HashSet<>();
+        for (SlotReference slot : info.distinctSlots) {
+            if (!slot.getOriginalColumn().isPresent()) {
+                return false;
+            }
+            
distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+        }
+        DistributionInfo distributionInfo = 
info.table.getDefaultDistributionInfo();
+        if (!(distributionInfo instanceof HashDistributionInfo)) {
+            return false;
+        }
+        List<Column> distributionColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
+        if (distributionColumns.isEmpty()) {
+            return false;
+        }
+        for (Column column : distributionColumns) {
+            if (!distinctColumnNames.contains(column.getName().toLowerCase())) 
{
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private DistinctDistributionInfo 
resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) {
+        Set<Expression> distinctArgs = aggregate.getDistinctArguments();
+        if (distinctArgs.isEmpty()) {
+            return null;
+        }
+        Set<SlotReference> distinctSlots = new HashSet<>();
+        for (Expression expression : distinctArgs) {
+            if (!(expression instanceof SlotReference)) {
+                return null;
+            }
+            distinctSlots.add((SlotReference) expression);
+        }
+        Plan child = aggregate.child();
+        while (child instanceof LogicalProject || child instanceof 
LogicalFilter) {
+            if (child instanceof LogicalProject) {
+                LogicalProject<? extends Plan> project = (LogicalProject<? 
extends Plan>) child;
+                Map<Slot, Expression> projectExprMap = new HashMap<>();
+                for (NamedExpression namedExpression : project.getProjects()) {
+                    Expression projectExpr = namedExpression;
+                    if (namedExpression instanceof Alias) {
+                        projectExpr = ((Alias) namedExpression).child();
+                    }
+                    projectExprMap.put(namedExpression.toSlot(), projectExpr);
+                }
+                Set<SlotReference> replaced = new HashSet<>();
+                for (SlotReference slot : distinctSlots) {
+                    Expression projectExpr = projectExprMap.get(slot);
+                    if (!(projectExpr instanceof SlotReference)) {
+                        return null;
+                    }
+                    replaced.add((SlotReference) projectExpr);
+                }
+                distinctSlots = replaced;
+                child = project.child();
+                continue;
+            }
+            child = ((LogicalFilter<? extends Plan>) child).child();
+        }
+        if (!(child instanceof LogicalOlapScan)) {
+            return null;
+        }
+        LogicalOlapScan scan = (LogicalOlapScan) child;
+        OlapTable olapTable = scan.getTable();
+        if (olapTable == null) {
+            return null;
+        }
+        if (olapTable.getPartitionInfo().getType() != 
PartitionType.UNPARTITIONED
+                && scan.getSelectedPartitionIds().size() > 1) {
+            return null;
+        }
+        for (SlotReference slot : distinctSlots) {
+            if (!slot.getOriginalTable().isPresent()
+                    || slot.getOriginalTable().get() != olapTable) {
+                return null;
+            }
+        }
+        return new DistinctDistributionInfo(olapTable, distinctSlots);
+    }
+
+    private static class DistinctDistributionInfo {
+        private final OlapTable table;

Review Comment:
   why use OlapTable, maybe DistributeInfo is better?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -130,6 +144,103 @@ boolean shouldUseMultiDistinct(LogicalAggregate<? extends 
Plan> aggregate) {
                 && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
+    private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {
+        DistinctDistributionInfo info = 
resolveDistinctDistributionInfo(aggregate);
+        if (info == null) {
+            return false;
+        }
+        Set<String> distinctColumnNames = new HashSet<>();
+        for (SlotReference slot : info.distinctSlots) {
+            if (!slot.getOriginalColumn().isPresent()) {

Review Comment:
   why need to get original column?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -70,6 +80,7 @@
  */
 public class DistinctAggregateRewriter implements RewriteRuleFactory {
     public static final DistinctAggregateRewriter INSTANCE = new 
DistinctAggregateRewriter();
+    private static final double MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD = 
30.0;

Review Comment:
   useless static variable?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -70,6 +80,7 @@
  */
 public class DistinctAggregateRewriter implements RewriteRuleFactory {
     public static final DistinctAggregateRewriter INSTANCE = new 
DistinctAggregateRewriter();
+    private static final double MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD = 
30.0;

Review Comment:
   add comment to explain why 30.0



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -130,6 +144,103 @@ boolean shouldUseMultiDistinct(LogicalAggregate<? extends 
Plan> aggregate) {
                 && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
+    private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {
+        DistinctDistributionInfo info = 
resolveDistinctDistributionInfo(aggregate);
+        if (info == null) {
+            return false;
+        }
+        Set<String> distinctColumnNames = new HashSet<>();
+        for (SlotReference slot : info.distinctSlots) {
+            if (!slot.getOriginalColumn().isPresent()) {
+                return false;
+            }
+            
distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+        }
+        DistributionInfo distributionInfo = 
info.table.getDefaultDistributionInfo();
+        if (!(distributionInfo instanceof HashDistributionInfo)) {
+            return false;
+        }
+        List<Column> distributionColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
+        if (distributionColumns.isEmpty()) {
+            return false;
+        }
+        for (Column column : distributionColumns) {
+            if (!distinctColumnNames.contains(column.getName().toLowerCase())) 
{
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private DistinctDistributionInfo 
resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) {
+        Set<Expression> distinctArgs = aggregate.getDistinctArguments();
+        if (distinctArgs.isEmpty()) {
+            return null;
+        }
+        Set<SlotReference> distinctSlots = new HashSet<>();
+        for (Expression expression : distinctArgs) {
+            if (!(expression instanceof SlotReference)) {
+                return null;
+            }
+            distinctSlots.add((SlotReference) expression);
+        }
+        Plan child = aggregate.child();
+        while (child instanceof LogicalProject || child instanceof 
LogicalFilter) {
+            if (child instanceof LogicalProject) {
+                LogicalProject<? extends Plan> project = (LogicalProject<? 
extends Plan>) child;
+                Map<Slot, Expression> projectExprMap = new HashMap<>();
+                for (NamedExpression namedExpression : project.getProjects()) {
+                    Expression projectExpr = namedExpression;
+                    if (namedExpression instanceof Alias) {
+                        projectExpr = ((Alias) namedExpression).child();
+                    }
+                    projectExprMap.put(namedExpression.toSlot(), projectExpr);
+                }
+                Set<SlotReference> replaced = new HashSet<>();
+                for (SlotReference slot : distinctSlots) {
+                    Expression projectExpr = projectExprMap.get(slot);
+                    if (!(projectExpr instanceof SlotReference)) {
+                        return null;
+                    }
+                    replaced.add((SlotReference) projectExpr);
+                }
+                distinctSlots = replaced;
+                child = project.child();
+                continue;
+            }
+            child = ((LogicalFilter<? extends Plan>) child).child();
+        }
+        if (!(child instanceof LogicalOlapScan)) {
+            return null;
+        }
+        LogicalOlapScan scan = (LogicalOlapScan) child;
+        OlapTable olapTable = scan.getTable();
+        if (olapTable == null) {
+            return null;
+        }

Review Comment:
   so after this PR, this rule only support olap table as the child of 
aggregate?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java:
##########
@@ -130,6 +144,103 @@ boolean shouldUseMultiDistinct(LogicalAggregate<? extends 
Plan> aggregate) {
                 && dstNdv > inputRows * 
AggregateUtils.HIGH_CARDINALITY_THRESHOLD;
     }
 
+    private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? 
extends Plan> aggregate) {
+        DistinctDistributionInfo info = 
resolveDistinctDistributionInfo(aggregate);
+        if (info == null) {
+            return false;
+        }
+        Set<String> distinctColumnNames = new HashSet<>();
+        for (SlotReference slot : info.distinctSlots) {
+            if (!slot.getOriginalColumn().isPresent()) {
+                return false;
+            }
+            
distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
+        }
+        DistributionInfo distributionInfo = 
info.table.getDefaultDistributionInfo();
+        if (!(distributionInfo instanceof HashDistributionInfo)) {
+            return false;
+        }
+        List<Column> distributionColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
+        if (distributionColumns.isEmpty()) {
+            return false;
+        }
+        for (Column column : distributionColumns) {
+            if (!distinctColumnNames.contains(column.getName().toLowerCase())) 
{
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private DistinctDistributionInfo 
resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) {

Review Comment:
   should add ut for this function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to