924060929 commented on code in PR #54079:
URL: https://github.com/apache/doris/pull/54079#discussion_r2300232811


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckAfterRewrite.java:
##########
@@ -111,8 +110,8 @@ private void checkAllSlotReferenceFromChildren(Plan plan) {
         notFromChildren = removeValidSlotsNotFromChildren(notFromChildren, 
childrenOutput);
         if (!notFromChildren.isEmpty()) {
             if (plan.arity() != 0 && plan.child(0) instanceof 
LogicalAggregate) {
-                throw new AnalysisException(String.format("%s not in 
aggregate's output", notFromChildren
-                        
.stream().map(NamedExpression::getName).collect(Collectors.joining(", "))));
+                // throw new AnalysisException(String.format("%s not in 
aggregate's output", notFromChildren
+                //         
.stream().map(NamedExpression::getName).collect(Collectors.joining(", "))));

Review Comment:
   should not comment it?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java:
##########
@@ -302,161 +255,6 @@ public List<Rule> buildRules() {
                         LogicalFileScan fileScan = project.child();
                         return storageLayerAggregate(agg, project, fileScan, 
ctx.cascadesContext);
                     })

Review Comment:
   should move `COUNT_ON_INDEX_WITHOUT_PROJECT` to a new file



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local 
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {

Review Comment:
   rename to `SplitAggWithoutDistinct`, and maybe you should use 
OneImplementationRuleFactory? because you generate some physical plans



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java:
##########
@@ -505,24 +507,18 @@ public enum RuleType {
     
STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
     COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION),
     COUNT_ON_INDEX_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
-    ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-    TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-    
TWO_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
-    
THREE_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
     TWO_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-    ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI(RuleTypeClass.IMPLEMENTATION),
-    TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI(RuleTypeClass.IMPLEMENTATION),
-    TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-    THREE_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-    FOUR_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-    
FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE(RuleTypeClass.IMPLEMENTATION),

Review Comment:
   You should skip unknown rules set in 
`SessionVariable.getDisableNereidsRules` and so on, because user maybe set the 
legacy rules in session variables



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhase.java:
##########
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.pattern.MatchingContext;
+import org.apache.doris.nereids.properties.ChildrenPropertiesRegulator;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Mod;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.XxHash32;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.SmallIntType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**SplitAggMultiPhase
+ * only process agg with distinct function, split Agg into multi phase
+ * */
+public class SplitAggMultiPhase extends SplitAggBaseRule implements 
ExplorationRuleFactory {
+    public static final SplitAggMultiPhase INSTANCE = new SplitAggMultiPhase();
+    private static final String SALT_EXPR = "saltExpr";
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                logicalAggregate()
+                        .when(agg -> !agg.getGroupByExpressions().isEmpty())
+                        .when(agg -> agg.getDistinctArguments().size() == 1 || 
agg.distinctFuncNum() == 1)
+                        .thenApplyMulti(this::rewrite)
+                        .toRule(RuleType.SPLIT_AGG_MULTI_PHASE)
+        );
+    }
+
+    private List<Plan> rewrite(MatchingContext<LogicalAggregate<GroupPlan>> 
ctx) {
+        LogicalAggregate<? extends Plan> aggregate = ctx.root;
+        if (aggregate.canSkewRewrite()) {
+            return ImmutableList.of(aggSkewRewrite(aggregate, 
ctx.cascadesContext));
+        } else {
+            if (twoPlusOneBetterThanTwoPlusTwo(aggregate)) {
+                return ImmutableList.<Plan>builder()
+                        .addAll(splitToTwoPlusOnePhase(aggregate))
+                        .addAll(splitToOnePlusTwoPhase(aggregate))
+                        .build();
+            } else {
+                return ImmutableList.<Plan>builder()
+                        .addAll(splitToTwoPlusTwoPhase(aggregate))
+                        .build();
+            }
+        }
+    }
+
+    /**
+     * select count(distinct a) group by b (deduplicated agg hashShuffle by 
group by key b)
+     * splitToTwoPlusOnePhase:
+     *   agg(group by b, count(a); distinct global)
+     *     +--agg(group by a,b; global)
+     *       +--hashShuffle(b)
+     *         +--agg(group by a,b; local)
+     *   agg(group by b, count(a); distinct global)
+     *     +--agg(group by a,b; global)
+     *       +--hashShuffle(b)
+     */
+    private List<Plan> splitToTwoPlusOnePhase(LogicalAggregate<? extends Plan> 
aggregate) {
+        ImmutableList.Builder<Plan> builder = ImmutableList.builder();
+        if (aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+            Set<NamedExpression> localAggGroupBySet = 
AggregateUtils.getAllKeySet(aggregate);
+            Map<AggregateFunction, Alias> middleAggFuncToAlias = new 
LinkedHashMap<>();
+            Plan middleAgg = splitDeduplicateTwoPhase(aggregate, 
middleAggFuncToAlias,
+                    aggregate.getGroupByExpressions(), localAggGroupBySet);
+            builder.add(splitDistinctOnePhase(aggregate, middleAggFuncToAlias, 
middleAgg));
+        }
+        if (aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+            Map<AggregateFunction, Alias> localAggFuncToAlias = new 
HashMap<>();
+            Plan localAgg = splitToOnePhase(aggregate, 
Utils.fastToImmutableList(aggregate.getGroupByExpressions()),
+                    localAggFuncToAlias);
+            builder.add(splitDistinctOnePhase(aggregate, localAggFuncToAlias, 
localAgg));
+        }
+        return builder.build();
+    }
+
+    private Plan splitToOnePhase(LogicalAggregate<? extends Plan> aggregate,

Review Comment:
   add shape transformation comments



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggBaseRule.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**SplitAggRule*/
+public abstract class SplitAggBaseRule {
+    protected PhysicalHashAggregate<? extends Plan> 
splitDeduplicateOnePhase(LogicalAggregate<? extends Plan> aggregate,
+            Set<NamedExpression> localAggGroupBySet, AggregateParam 
inputToBufferParam, AggregateParam paramForAggFunc,
+            Map<AggregateFunction, Alias> localAggFunctionToAlias, Plan child, 
List<Expression> partitionExpressions) {
+        aggregate.getAggregateFunctions().stream()
+                .filter(aggFunc -> !aggFunc.isDistinct())
+                .collect(Collectors.toMap(
+                        expr -> expr,
+                        expr -> {
+                            AggregateExpression localAggExpr = new 
AggregateExpression(expr, paramForAggFunc);
+                            return new Alias(localAggExpr);
+                        },
+                        (existing, replacement) -> existing,
+                        () -> localAggFunctionToAlias
+                ));
+        List<NamedExpression> localAggOutput = 
ImmutableList.<NamedExpression>builder()
+                .addAll(localAggGroupBySet)
+                .addAll(localAggFunctionToAlias.values())
+                .build();
+        List<Expression> localAggGroupBy = 
Utils.fastToImmutableList(localAggGroupBySet);
+        boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && 
localAggOutput.isEmpty();
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            localAggGroupBy = ImmutableList.of(new 
NullLiteral(TinyIntType.INSTANCE));
+            localAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+        return new PhysicalHashAggregate<>(localAggGroupBy, localAggOutput, 
Optional.ofNullable(partitionExpressions),
+                inputToBufferParam, 
AggregateUtils.maybeUsingStreamAgg(localAggGroupBy, inputToBufferParam),
+                null, child);
+    }
+
+    protected PhysicalHashAggregate<? extends Plan> 
splitDeduplicateTwoPhase(LogicalAggregate<? extends Plan> aggregate,
+            Map<AggregateFunction, Alias> middleAggFunctionToAlias, 
List<Expression> partitionExpressions,
+            Set<NamedExpression> localAggGroupBySet) {
+        // first phase
+        AggregateParam inputToBufferParam = AggregateParam.LOCAL_BUFFER;
+        Map<AggregateFunction, Alias> localAggFunctionToAlias = new 
LinkedHashMap<>();
+        PhysicalHashAggregate<? extends Plan> localAgg = 
splitDeduplicateOnePhase(aggregate, localAggGroupBySet,
+                inputToBufferParam, inputToBufferParam, 
localAggFunctionToAlias, aggregate.child(), ImmutableList.of());
+
+        // second phase
+        AggregateParam bufferToBufferParam = new 
AggregateParam(AggPhase.GLOBAL, AggMode.BUFFER_TO_BUFFER);
+        localAggFunctionToAlias.entrySet()
+                .stream()
+                .collect(Collectors.toMap(Entry::getKey,
+                        kv -> {
+                            AggregateExpression middleAggExpr = new 
AggregateExpression(kv.getKey(),
+                                    bufferToBufferParam, 
kv.getValue().toSlot());
+                            return new Alias(middleAggExpr);
+                        },
+                        (existing, replacement) -> existing,
+                        () -> middleAggFunctionToAlias));
+        List<NamedExpression> middleAggOutput = 
ImmutableList.<NamedExpression>builder()
+                .addAll(localAggGroupBySet)
+                .addAll(middleAggFunctionToAlias.values())
+                .build();
+        if (middleAggOutput.isEmpty()) {
+            middleAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+        return new PhysicalHashAggregate<>(localAgg.getGroupByExpressions(), 
middleAggOutput,
+                Optional.ofNullable(partitionExpressions), bufferToBufferParam,
+                
AggregateUtils.maybeUsingStreamAgg(localAgg.getGroupByExpressions(), 
bufferToBufferParam),
+                null, localAgg);
+    }
+
+    protected PhysicalHashAggregate<? extends Plan> 
splitDistinctTwoPhase(LogicalAggregate<? extends Plan> aggregate,

Review Comment:
   add shape transformation comments



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggBaseRule.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**SplitAggRule*/
+public abstract class SplitAggBaseRule {
+    protected PhysicalHashAggregate<? extends Plan> 
splitDeduplicateOnePhase(LogicalAggregate<? extends Plan> aggregate,
+            Set<NamedExpression> localAggGroupBySet, AggregateParam 
inputToBufferParam, AggregateParam paramForAggFunc,
+            Map<AggregateFunction, Alias> localAggFunctionToAlias, Plan child, 
List<Expression> partitionExpressions) {
+        aggregate.getAggregateFunctions().stream()
+                .filter(aggFunc -> !aggFunc.isDistinct())
+                .collect(Collectors.toMap(
+                        expr -> expr,
+                        expr -> {
+                            AggregateExpression localAggExpr = new 
AggregateExpression(expr, paramForAggFunc);
+                            return new Alias(localAggExpr);
+                        },
+                        (existing, replacement) -> existing,
+                        () -> localAggFunctionToAlias
+                ));
+        List<NamedExpression> localAggOutput = 
ImmutableList.<NamedExpression>builder()
+                .addAll(localAggGroupBySet)
+                .addAll(localAggFunctionToAlias.values())
+                .build();
+        List<Expression> localAggGroupBy = 
Utils.fastToImmutableList(localAggGroupBySet);
+        boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && 
localAggOutput.isEmpty();
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            localAggGroupBy = ImmutableList.of(new 
NullLiteral(TinyIntType.INSTANCE));
+            localAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+        return new PhysicalHashAggregate<>(localAggGroupBy, localAggOutput, 
Optional.ofNullable(partitionExpressions),
+                inputToBufferParam, 
AggregateUtils.maybeUsingStreamAgg(localAggGroupBy, inputToBufferParam),
+                null, child);
+    }
+
+    protected PhysicalHashAggregate<? extends Plan> 
splitDeduplicateTwoPhase(LogicalAggregate<? extends Plan> aggregate,

Review Comment:
   add shape transformation comments



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhaseWithoutGbyKey.java:
##########
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.GroupConcat;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctGroupConcat;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum0;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**SplitAggMultiPhaseWithoutGbyKey*/
+public class SplitAggMultiPhaseWithoutGbyKey extends SplitAggBaseRule 
implements ExplorationRuleFactory {

Review Comment:
   rename to `SplitDistinctAggWithoutGbyKey`, maybe you should use 
ImplementationRuleFactory?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -110,74 +112,141 @@ public List<List<PhysicalProperties>> 
visitPhysicalHashAggregate(
         if (agg.getGroupByExpressions().isEmpty() && 
agg.getOutputExpressions().isEmpty()) {
             return ImmutableList.of();
         }
+        // If the origin attribute satisfies the group by key but does not 
meet the requirements, ban the plan.
+        // e.g. select count(distinct a) from t group by b;
+        // requiredChildProperty: a
+        // but the child is already distributed by b
+        // ban this plan
+        PhysicalProperties originChildProperty = 
originChildrenProperties.get(0);
+        PhysicalProperties requiredChildProperty = requiredProperties.get(0);
+        PhysicalProperties hashSpec = 
PhysicalProperties.createHash(agg.getGroupByExpressions(), ShuffleType.REQUIRE);
+        GroupExpression child = children.get(0);
+        if (child.getPlan() instanceof PhysicalDistribute) {
+            PhysicalProperties properties = new PhysicalProperties(
+                    DistributionSpecAny.INSTANCE, 
originChildProperty.getOrderSpec());
+            GroupExpression distributeChild = 
child.getOwnerGroup().getLowestCostPlan(properties).get().second;
+            PhysicalProperties distributeChildProperties = 
distributeChild.getOutputProperties(properties);
+            if (distributeChildProperties.satisfy(hashSpec)
+                    && 
!distributeChildProperties.satisfy(requiredChildProperty)) {
+                return ImmutableList.of();
+            }
+        }
+
         if (!agg.getAggregateParam().canBeBanned) {
             return visit(agg, context);
         }
-        // forbid one phase agg on distribute
-        if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && 
children.get(0).getPlan() instanceof PhysicalDistribute) {
-            // this means one stage gather agg, usually bad pattern
+        // return aggBanByStatistics(agg, context);
+        if (shouldBanOnePhaseAgg(agg, requiredChildProperty)) {
             return ImmutableList.of();
         }
+        // process must shuffle
+        return visit(agg, context);
+    }
 
-        // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
-        // TODO: this is forbid good plan after cte reuse by mistake
-        if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER
-                && requiredProperties.get(0).getDistributionSpec() instanceof 
DistributionSpecHash
-                && children.get(0).getPlan() instanceof PhysicalDistribute) {
-            return ImmutableList.of();
+    /**
+     * 1. Generally, one-stage AGG is disabled unless the child of distribute 
is a CTE consumer.
+     * 2. If it is a CTE consumer, to avoid being banned, ensure that 
distribute is not a gather.
+     *    Alternatively, if the distribute is a shuffle, ensure that the 
shuffle expr is not skewed.
+     * */
+    private boolean shouldBanOnePhaseAgg(PhysicalHashAggregate<? extends Plan> 
aggregate,
+            PhysicalProperties requiredChildProperty) {
+        if (banAggUnionAll(aggregate)) {
+            return true;
+        }
+        if (!onePhaseAggWithDistribute(aggregate)) {
+            return false;
         }
+        if (childIsCTEConsumer()) {
+            // shape is agg-distribute-CTEConsumer
+            // distribute is gather
+            if (requireGather(requiredChildProperty)) {
+                return true;
+            }
+            // group by key is skew
+            if (skewOnShuffleExpr(aggregate)) {
+                return true;
+            }
+            return false;
 
-        // agg(group by x)-union all(A, B)
-        // no matter x.ndv is high or not, it is not worthwhile to shuffle A 
and B by x
-        // and hence we forbid one phase agg
-        if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
-                && children.get(0).getPlan() instanceof PhysicalUnion
-                && !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) {
-            return ImmutableList.of();
+        } else {
+            return true;
         }
-        // forbid multi distinct opt that bad than multi-stage version when 
multi-stage can be executed in one fragment
-        if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() == 
AggMode.INPUT_TO_RESULT) {
-            List<MultiDistinction> multiDistinctions = 
agg.getOutputExpressions().stream()
-                    .filter(Alias.class::isInstance)
-                    .map(a -> ((Alias) a).child())
-                    .filter(AggregateExpression.class::isInstance)
-                    .map(a -> ((AggregateExpression) a).getFunction())
-                    .filter(MultiDistinction.class::isInstance)
-                    .map(MultiDistinction.class::cast)
-                    .collect(Collectors.toList());
-            if (multiDistinctions.size() == 1) {
-                Expression distinctChild = multiDistinctions.get(0).child(0);
-                DistributionSpec childDistribution = 
originChildrenProperties.get(0).getDistributionSpec();
-                if (distinctChild instanceof SlotReference && 
childDistribution instanceof DistributionSpecHash) {
-                    SlotReference slotReference = (SlotReference) 
distinctChild;
-                    DistributionSpecHash distributionSpecHash = 
(DistributionSpecHash) childDistribution;
-                    List<ExprId> groupByColumns = 
agg.getGroupByExpressions().stream()
-                            .map(SlotReference.class::cast)
-                            .map(SlotReference::getExprId)
-                            .collect(Collectors.toList());
-                    DistributionSpecHash groupByRequire = new 
DistributionSpecHash(
-                            groupByColumns, ShuffleType.REQUIRE);
-                    List<ExprId> distinctChildColumns = 
Lists.newArrayList(slotReference.getExprId());
-                    distinctChildColumns.add(slotReference.getExprId());
-                    DistributionSpecHash distinctChildRequire = new 
DistributionSpecHash(
-                            distinctChildColumns, ShuffleType.REQUIRE);
-                    if ((!groupByColumns.isEmpty() && 
distributionSpecHash.satisfy(groupByRequire))
-                            || (groupByColumns.isEmpty() && 
distributionSpecHash.satisfy(distinctChildRequire))) {
-                        if (!agg.mustUseMultiDistinctAgg()) {
-                            return ImmutableList.of();
-                        }
-                    }
-                }
-                // if distinct without group by key, we prefer three or four 
stage distinct agg
-                // because the second phase of multi-distinct only have one 
instance, and it is slow generally.
-                if (agg.getOutputExpressions().size() == 1 && 
agg.getGroupByExpressions().isEmpty()
-                        && !agg.mustUseMultiDistinctAgg()) {
-                    return ImmutableList.of();
-                }
+    }
+
+    private boolean skewOnShuffleExpr(PhysicalHashAggregate<? extends Plan> 
agg) {
+        // if statistic is unknown -> not skew
+        Statistics aggStatistics = 
agg.getGroupExpression().get().getOwnerGroup().getStatistics();
+        Statistics inputStatistics = 
agg.getGroupExpression().get().childStatistics(0);
+        if (aggStatistics == null || inputStatistics == null) {
+            return false;
+        }
+        if (AggregateUtils.hasUnknownStatistics(agg.getGroupByExpressions(), 
inputStatistics)) {
+            return false;
+        }
+        // There are two cases of skew:
+        double gbyNdv = aggStatistics.getRowCount();
+        // 1. ndv is very low
+        if (gbyNdv < LOW_NDV_THRESHOLD) {
+            return true;
+        }
+        // 2. There is a hot value, and the ndv of other keys is very low
+        if (isSkew(agg.getGroupByExpressions(), inputStatistics)) {
+            return true;
+        }
+        return false;
+    }
+
+    // if one group by key has hot value, and others ndv is low -> skew
+    private boolean isSkew(List<Expression> groupBy, Statistics 
inputStatistics) {
+        for (int i = 0; i < groupBy.size(); ++i) {
+            Expression expr = groupBy.get(i);
+            ColumnStatistic colStat = 
inputStatistics.findColumnStatistics(expr);
+            if (colStat == null) {
+                continue;
+            }
+            if (colStat.getHotValues() == null) {
+                continue;
+            }
+            List<Expression> otherExpr = excludeElement(groupBy, i);
+            double otherNdv = 
StatsCalculator.estimateGroupByRowCount(otherExpr, inputStatistics);
+            if (otherNdv < LOW_NDV_THRESHOLD) {
+                return true;
             }
         }
-        // process must shuffle
-        return visit(agg, context);
+        return false;
+    }
+
+    private static <T> List<T> excludeElement(List<T> list, int index) {
+        List<T> newList = new ArrayList<>();
+        for (int i = 0; i < list.size(); i++) {
+            if (index != i) {
+                newList.add(list.get(i));
+            }
+        }
+        return newList;
+    }
+
+    private boolean onePhaseAggWithDistribute(PhysicalHashAggregate<? extends 
Plan> aggregate) {
+        return aggregate.getAggMode() == AggMode.INPUT_TO_RESULT
+                && children.get(0).getPlan() instanceof PhysicalDistribute;
+    }
+
+    private boolean childIsCTEConsumer() {
+        return 
children.get(0).children().get(0).getPhysicalExpressions().get(0).getPlan()
+                instanceof PhysicalCTEConsumer;

Review Comment:
   maybe throw NPE?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctStrategy.java:
##########
@@ -211,38 +160,28 @@ private static boolean 
isDistinctMultiColumns(AggregateFunction func) {
         return false;
     }
 
-    private static boolean needTransform(LogicalAggregate<Plan> agg, 
List<Alias> aliases, List<Alias> otherAggFuncs) {
+    private static void collectInfo(LogicalAggregate<? extends Plan> agg, 
List<List<Alias>> aliases,

Review Comment:
   rename to collectDistinctAndNonDistinctFunctions



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -1111,6 +1112,8 @@ public PlanFragment visitPhysicalHashAggregate(
         // 2. collect agg expressions and generate agg function to slot 
reference map
         List<Slot> aggFunctionOutput = Lists.newArrayList();
         ArrayList<FunctionCallExpr> execAggregateFunctions = 
Lists.newArrayListWithCapacity(outputExpressions.size());
+        boolean isPartial = 
aggregate.getAggregateParam().aggMode.productAggregateBuffer;

Review Comment:
   @feiniaofeiafei why aggMode isPartial but contains partial aggregation 
function



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local 
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {
+    public static final SplitAgg INSTANCE = new SplitAgg();
+
+    @Override
+    public Rule build() {
+        return logicalAggregate()
+                .whenNot(Aggregate::hasDistinctFunc)
+                .thenApplyMulti(ctx -> rewrite(ctx.root))
+                .toRule(RuleType.SPLIT_AGG);
+    }
+
+    private List<Plan> rewrite(LogicalAggregate<? extends Plan> aggregate) {
+        return ImmutableList.<Plan>builder()
+                .addAll(splitTwoPhase(aggregate))
+                .addAll(implementOnePhase(aggregate))
+                .build();
+    }
+
+    private List<Plan> implementOnePhase(LogicalAggregate<? extends Plan> 
logicalAgg) {

Review Comment:
   add some comments which show the transformation of plan shape like 
`AggregateStrategies.onePhaseAggregateWithMultiDistinct`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctStrategy.java:
##########
@@ -211,38 +160,28 @@ private static boolean 
isDistinctMultiColumns(AggregateFunction func) {
         return false;
     }

Review Comment:
   this function not be used? delete it



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggBaseRule.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**SplitAggRule*/
+public abstract class SplitAggBaseRule {
+    protected PhysicalHashAggregate<? extends Plan> 
splitDeduplicateOnePhase(LogicalAggregate<? extends Plan> aggregate,

Review Comment:
   add shape transformation comments



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local 
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {
+    public static final SplitAgg INSTANCE = new SplitAgg();
+
+    @Override
+    public Rule build() {
+        return logicalAggregate()
+                .whenNot(Aggregate::hasDistinctFunc)
+                .thenApplyMulti(ctx -> rewrite(ctx.root))
+                .toRule(RuleType.SPLIT_AGG);
+    }
+
+    private List<Plan> rewrite(LogicalAggregate<? extends Plan> aggregate) {
+        return ImmutableList.<Plan>builder()
+                .addAll(splitTwoPhase(aggregate))
+                .addAll(implementOnePhase(aggregate))
+                .build();
+    }
+
+    private List<Plan> implementOnePhase(LogicalAggregate<? extends Plan> 
logicalAgg) {
+        if (!logicalAgg.supportAggregatePhase(AggregatePhase.ONE)) {
+            return ImmutableList.of();
+        }
+        ImmutableList.Builder<NamedExpression> builder = 
ImmutableList.builder();
+        boolean changed = false;
+        for (NamedExpression expr : logicalAgg.getOutputExpressions()) {
+            if (expr instanceof Alias && expr.child(0) instanceof 
AggregateFunction) {
+                Alias alias = (Alias) expr;
+                AggregateExpression aggExpr = new 
AggregateExpression((AggregateFunction) expr.child(0),
+                        AggregateParam.GLOBAL_RESULT);
+                builder.add(alias.withChildren(ImmutableList.of(aggExpr)));
+                changed = true;
+            } else {
+                builder.add(expr);
+            }
+        }
+        AggregateParam param = new AggregateParam(AggPhase.GLOBAL, 
AggMode.INPUT_TO_RESULT, !skipRegulator(logicalAgg));
+        List<NamedExpression> aggOutput = changed ? builder.build() : 
logicalAgg.getOutputExpressions();
+        return ImmutableList.of(new 
PhysicalHashAggregate<>(logicalAgg.getGroupByExpressions(), aggOutput, param,
+                
AggregateUtils.maybeUsingStreamAgg(logicalAgg.getGroupByExpressions(), param),
+                null, logicalAgg.child()));
+    }
+
+    private List<Plan> splitTwoPhase(LogicalAggregate<? extends Plan> 
aggregate) {

Review Comment:
   add some comments which show the transformation of plan shape



##########
fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java:
##########
@@ -322,6 +332,7 @@ public void testSelectFromGroupBy() {
         });
     }
 
+    @Disabled

Review Comment:
   should not disable



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhase.java:
##########
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.pattern.MatchingContext;
+import org.apache.doris.nereids.properties.ChildrenPropertiesRegulator;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Mod;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.XxHash32;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.SmallIntType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**SplitAggMultiPhase
+ * only process agg with distinct function, split Agg into multi phase
+ * */
+public class SplitAggMultiPhase extends SplitAggBaseRule implements 
ExplorationRuleFactory {
+    public static final SplitAggMultiPhase INSTANCE = new SplitAggMultiPhase();
+    private static final String SALT_EXPR = "saltExpr";
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                logicalAggregate()
+                        .when(agg -> !agg.getGroupByExpressions().isEmpty())
+                        .when(agg -> agg.getDistinctArguments().size() == 1 || 
agg.distinctFuncNum() == 1)
+                        .thenApplyMulti(this::rewrite)
+                        .toRule(RuleType.SPLIT_AGG_MULTI_PHASE)
+        );
+    }
+
+    private List<Plan> rewrite(MatchingContext<LogicalAggregate<GroupPlan>> 
ctx) {
+        LogicalAggregate<? extends Plan> aggregate = ctx.root;
+        if (aggregate.canSkewRewrite()) {
+            return ImmutableList.of(aggSkewRewrite(aggregate, 
ctx.cascadesContext));
+        } else {
+            if (twoPlusOneBetterThanTwoPlusTwo(aggregate)) {
+                return ImmutableList.<Plan>builder()
+                        .addAll(splitToTwoPlusOnePhase(aggregate))
+                        .addAll(splitToOnePlusTwoPhase(aggregate))
+                        .build();
+            } else {
+                return ImmutableList.<Plan>builder()
+                        .addAll(splitToTwoPlusTwoPhase(aggregate))
+                        .build();
+            }
+        }
+    }
+
+    /**
+     * select count(distinct a) group by b (deduplicated agg hashShuffle by 
group by key b)
+     * splitToTwoPlusOnePhase:
+     *   agg(group by b, count(a); distinct global)
+     *     +--agg(group by a,b; global)
+     *       +--hashShuffle(b)
+     *         +--agg(group by a,b; local)
+     *   agg(group by b, count(a); distinct global)
+     *     +--agg(group by a,b; global)
+     *       +--hashShuffle(b)
+     */
+    private List<Plan> splitToTwoPlusOnePhase(LogicalAggregate<? extends Plan> 
aggregate) {
+        ImmutableList.Builder<Plan> builder = ImmutableList.builder();
+        if (aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+            Set<NamedExpression> localAggGroupBySet = 
AggregateUtils.getAllKeySet(aggregate);
+            Map<AggregateFunction, Alias> middleAggFuncToAlias = new 
LinkedHashMap<>();
+            Plan middleAgg = splitDeduplicateTwoPhase(aggregate, 
middleAggFuncToAlias,
+                    aggregate.getGroupByExpressions(), localAggGroupBySet);
+            builder.add(splitDistinctOnePhase(aggregate, middleAggFuncToAlias, 
middleAgg));
+        }
+        if (aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+            Map<AggregateFunction, Alias> localAggFuncToAlias = new 
HashMap<>();
+            Plan localAgg = splitToOnePhase(aggregate, 
Utils.fastToImmutableList(aggregate.getGroupByExpressions()),
+                    localAggFuncToAlias);
+            builder.add(splitDistinctOnePhase(aggregate, localAggFuncToAlias, 
localAgg));
+        }
+        return builder.build();
+    }
+
+    private Plan splitToOnePhase(LogicalAggregate<? extends Plan> aggregate,
+            List<Expression> partitionExpressions, Map<AggregateFunction, 
Alias> localAggFunctionToAlias) {
+        Set<NamedExpression> localAggGroupBySet = 
AggregateUtils.getAllKeySet(aggregate);
+        // first phase
+        AggregateParam inputToResultParamFirst = new 
AggregateParam(AggPhase.GLOBAL, AggMode.INPUT_TO_RESULT);
+        AggregateParam paramForAggFunc = new AggregateParam(AggPhase.GLOBAL, 
AggMode.INPUT_TO_BUFFER);
+        return splitDeduplicateOnePhase(aggregate, localAggGroupBySet, 
inputToResultParamFirst,
+                paramForAggFunc, localAggFunctionToAlias, aggregate.child(),
+                partitionExpressions);
+    }
+
+    /**
+     * select count(distinct a) group by b (deduplicated agg hashShuffle by 
group by key b)
+     * splitToTwoPlusTwoPhase:
+     *   agg(group by b, count(a); distinct global)
+     *     +--hashShuffle(b)
+     *       +--agg(group by b, count(a); distinct local)
+     *         +--agg(group by a,b; global)
+     *           +--hashShuffle(a,b)
+     *             +--agg(group by a,b; local)
+     *   agg(group by b, count(a); distinct global)
+     *     +--hashShuffle(b)
+     *       +--agg(group by b, count(a); distinct local)
+     *         +--agg(group by a,b; global)
+     *           +--hashShuffle(a,b)
+     */
+    private List<Plan> splitToTwoPlusTwoPhase(LogicalAggregate<? extends Plan> 
aggregate) {
+        ImmutableList.Builder<Plan> builder = ImmutableList.builder();
+        Set<NamedExpression> localAggGroupBySet = 
AggregateUtils.getAllKeySet(aggregate);
+        if (aggregate.supportAggregatePhase(AggregatePhase.FOUR)) {
+            Map<AggregateFunction, Alias> middleAggFunctionToAlias = new 
LinkedHashMap<>();
+            Plan twoPhaseAgg = splitDeduplicateTwoPhase(aggregate, 
middleAggFunctionToAlias,
+                    Utils.fastToImmutableList(localAggGroupBySet), 
localAggGroupBySet);
+            builder.add(splitDistinctTwoPhase(aggregate, 
middleAggFunctionToAlias, twoPhaseAgg));
+        }
+        Map<AggregateFunction, Alias> localAggFunctionToAlias = new 
HashMap<>();
+        Plan onePhaseAgg = splitToOnePhase(aggregate, 
Utils.fastToImmutableList(localAggGroupBySet),
+                localAggFunctionToAlias);
+        if (aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+            builder.add(splitDistinctTwoPhase(aggregate, 
localAggFunctionToAlias, onePhaseAgg));
+        }
+        if (aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+            builder.add(splitDistinctOnePhase(aggregate, 
localAggFunctionToAlias, onePhaseAgg));
+        }
+        return builder.build();
+    }
+
+    /**
+     * select count(distinct a) group by b (deduplicated agg hashShuffle by 
group by key b)
+     * splitToOnePlusTwoPhase: (deduplicated agg hashShuffle by distinct key a)
+     *   agg(group by b, count(a); distinct global)
+     *     +--hashShuffle(b)
+     *       +--agg(group by b, count(a); distinct local)
+     *         +--agg(group by a,b; global)
+     *           +--hashShuffle(a)
+     */
+    private List<Plan> splitToOnePlusTwoPhase(LogicalAggregate<? extends Plan> 
aggregate) {
+        if (!aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+            return ImmutableList.of();
+        }
+        Set<NamedExpression> localAggGroupBySet = 
AggregateUtils.getAllKeySet(aggregate);
+        // first phase
+        AggregateParam paramForAgg = new AggregateParam(AggPhase.GLOBAL, 
AggMode.INPUT_TO_RESULT);
+        AggregateParam paramForAggFunc = new AggregateParam(AggPhase.GLOBAL, 
AggMode.INPUT_TO_BUFFER);
+
+        Map<AggregateFunction, Alias> localAggFunctionToAlias = new 
LinkedHashMap<>();
+        Plan localAgg = splitDeduplicateOnePhase(aggregate, 
localAggGroupBySet, paramForAgg, paramForAggFunc,
+                localAggFunctionToAlias, aggregate.child(),
+                Utils.fastToImmutableList(aggregate.getDistinctArguments()));
+        return ImmutableList.of(splitDistinctTwoPhase(aggregate, 
localAggFunctionToAlias, localAgg));
+    }
+
+    private PhysicalHashAggregate<? extends Plan> 
splitDistinctOnePhase(LogicalAggregate<? extends Plan> aggregate,

Review Comment:
   add shape transformation comments



##########
fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java:
##########
@@ -335,6 +346,7 @@ public void phasesAgg() {
         Assertions.assertNotEquals(onePhaseAggPlans, twoPhaseAggPlans);
     }
 
+    @Disabled

Review Comment:
   should not disable



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhase.java:
##########
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.pattern.MatchingContext;
+import org.apache.doris.nereids.properties.ChildrenPropertiesRegulator;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Mod;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.XxHash32;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.SmallIntType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**SplitAggMultiPhase
+ * only process agg with distinct function, split Agg into multi phase
+ * */
+public class SplitAggMultiPhase extends SplitAggBaseRule implements 
ExplorationRuleFactory {

Review Comment:
   rename to `SplitDistinctAggWithGbyKey`, maybe you should use 
ImplementationRuleFactory?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java:
##########
@@ -216,7 +216,8 @@ public void execute() {
             // if break when running the loop above, the condition must be 
false.
             if (curChildIndex == groupExpression.arity()) {
                 if (!calculateEnforce(requestChildrenProperties, 
outputChildrenProperties)) {
-                    return; // if error exists, return
+                    clear();

Review Comment:
   +1



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local 
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {
+    public static final SplitAgg INSTANCE = new SplitAgg();
+
+    @Override
+    public Rule build() {
+        return logicalAggregate()
+                .whenNot(Aggregate::hasDistinctFunc)
+                .thenApplyMulti(ctx -> rewrite(ctx.root))
+                .toRule(RuleType.SPLIT_AGG);
+    }
+
+    private List<Plan> rewrite(LogicalAggregate<? extends Plan> aggregate) {
+        return ImmutableList.<Plan>builder()
+                .addAll(splitTwoPhase(aggregate))
+                .addAll(implementOnePhase(aggregate))
+                .build();
+    }
+
+    private List<Plan> implementOnePhase(LogicalAggregate<? extends Plan> 
logicalAgg) {
+        if (!logicalAgg.supportAggregatePhase(AggregatePhase.ONE)) {
+            return ImmutableList.of();
+        }
+        ImmutableList.Builder<NamedExpression> builder = 
ImmutableList.builder();
+        boolean changed = false;
+        for (NamedExpression expr : logicalAgg.getOutputExpressions()) {
+            if (expr instanceof Alias && expr.child(0) instanceof 
AggregateFunction) {
+                Alias alias = (Alias) expr;
+                AggregateExpression aggExpr = new 
AggregateExpression((AggregateFunction) expr.child(0),
+                        AggregateParam.GLOBAL_RESULT);
+                builder.add(alias.withChildren(ImmutableList.of(aggExpr)));
+                changed = true;
+            } else {
+                builder.add(expr);
+            }
+        }
+        AggregateParam param = new AggregateParam(AggPhase.GLOBAL, 
AggMode.INPUT_TO_RESULT, !skipRegulator(logicalAgg));
+        List<NamedExpression> aggOutput = changed ? builder.build() : 
logicalAgg.getOutputExpressions();
+        return ImmutableList.of(new 
PhysicalHashAggregate<>(logicalAgg.getGroupByExpressions(), aggOutput, param,
+                
AggregateUtils.maybeUsingStreamAgg(logicalAgg.getGroupByExpressions(), param),
+                null, logicalAgg.child()));
+    }
+
+    private List<Plan> splitTwoPhase(LogicalAggregate<? extends Plan> 
aggregate) {
+        if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+            return ImmutableList.of();
+        }
+        AggregateParam inputToBufferParam = new AggregateParam(AggPhase.LOCAL, 
AggMode.INPUT_TO_BUFFER);
+        Map<AggregateFunction, Alias> aggFunctionToAlias = 
aggregate.getAggregateFunctions().stream()
+                .collect(ImmutableMap.toImmutableMap(function -> function, 
function -> {
+                    AggregateExpression localAggFunc = new 
AggregateExpression(function, inputToBufferParam);
+                    return new Alias(localAggFunc);
+                }));
+        List<NamedExpression> localAggOutput = 
ImmutableList.<NamedExpression>builder()
+                .addAll((List) aggregate.getGroupByExpressions())
+                .addAll(aggFunctionToAlias.values())
+                .build();
+
+        PhysicalHashAggregate<? extends Plan> localAgg = new 
PhysicalHashAggregate<>(aggregate.getGroupByExpressions(),
+                localAggOutput, inputToBufferParam,
+                
AggregateUtils.maybeUsingStreamAgg(aggregate.getGroupByExpressions(), 
inputToBufferParam),
+                null, aggregate.child());
+
+        // global agg
+        AggregateParam bufferToResultParam = new 
AggregateParam(AggPhase.GLOBAL, AggMode.BUFFER_TO_RESULT);
+        List<NamedExpression> globalAggOutput = 
ExpressionUtils.rewriteDownShortCircuit(
+                aggregate.getOutputExpressions(), expr -> {
+                    if (!(expr instanceof AggregateFunction)) {
+                        return expr;
+                    }
+                    Alias alias = aggFunctionToAlias.get(expr);
+                    if (alias == null) {
+                        return expr;
+                    }
+                    AggregateFunction aggFunc = (AggregateFunction) expr;
+                    return new AggregateExpression(aggFunc, 
bufferToResultParam, alias.toSlot());
+                });
+        return ImmutableList.of(new 
PhysicalHashAggregate<>(aggregate.getGroupByExpressions(),
+                globalAggOutput, bufferToResultParam,
+                
AggregateUtils.maybeUsingStreamAgg(aggregate.getGroupByExpressions(), 
bufferToResultParam),
+                aggregate.getLogicalProperties(), localAgg));
+    }
+
+    private boolean shouldUseLocalAgg(LogicalAggregate<? extends Plan> 
aggregate) {
+        Statistics aggStats = 
aggregate.getGroupExpression().get().getOwnerGroup().getStatistics();
+        Statistics aggChildStats = 
aggregate.getGroupExpression().get().childStatistics(0);
+        // if gbyNdv is high, should not use local agg
+        double rows = aggChildStats.getRowCount();
+        double gbyNdv = aggStats.getRowCount();
+        return gbyNdv * 10 < rows;
+    }

Review Comment:
   not used, delete this function



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggStrategySelector.java:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import 
org.apache.doris.nereids.rules.rewrite.DistinctAggStrategySelector.DistinctSelectorContext;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Chooses the optimal execution strategy for queries with multiple DISTINCT 
aggregations.
+ *
+ * Handles queries like "SELECT COUNT(DISTINCT c1), COUNT(DISTINCT c2) FROM t" 
by selecting between:
+ * - CTE decomposition: Splits into multiple CTEs, each computing one DISTINCT 
aggregate
+ * - Multi-DISTINCT function: Processes all distinct function use multi 
distinct function
+ *
+ * Selection criteria includes:
+ * - Number of distinct aggregates
+ * - Estimated cardinality of distinct values
+ * - Available memory resources
+ * - Query complexity
+ */
+public class DistinctAggStrategySelector extends 
DefaultPlanRewriter<DistinctSelectorContext>
+        implements CustomRewriter {
+    public static DistinctAggStrategySelector INSTANCE = new 
DistinctAggStrategySelector();
+
+    /**DistinctSplitContext*/
+    public static class DistinctSelectorContext {
+        List<LogicalCTEProducer<? extends Plan>> cteProducerList;
+        StatementContext statementContext;
+        CascadesContext cascadesContext;
+
+        public DistinctSelectorContext(StatementContext statementContext, 
CascadesContext cascadesContext) {
+            this.statementContext = statementContext;
+            this.cteProducerList = new ArrayList<>();
+            this.cascadesContext = cascadesContext;
+        }
+    }
+
+    @Override
+    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        DistinctSelectorContext ctx = new DistinctSelectorContext(
+                jobContext.getCascadesContext().getStatementContext(), 
jobContext.getCascadesContext());
+        plan = plan.accept(this, ctx);
+        for (int i = ctx.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
ctx.cteProducerList.get(i);
+            plan = new LogicalCTEAnchor<>(producer.getCteId(), producer, plan);
+        }
+        return plan;
+    }
+
+    @Override
+    public Plan visitLogicalCTEAnchor(
+            LogicalCTEAnchor<? extends Plan, ? extends Plan> anchor, 
DistinctSelectorContext ctx) {
+        Plan child1 = anchor.child(0).accept(this, ctx);
+        DistinctSelectorContext consumerContext =
+                new DistinctSelectorContext(ctx.statementContext, 
ctx.cascadesContext);
+        Plan child2 = anchor.child(1).accept(this, consumerContext);
+        for (int i = consumerContext.cteProducerList.size() - 1; i >= 0; i--) {
+            LogicalCTEProducer<? extends Plan> producer = 
consumerContext.cteProducerList.get(i);
+            child2 = new LogicalCTEAnchor<>(producer.getCteId(), producer, 
child2);
+        }
+        return anchor.withChildren(ImmutableList.of(child1, child2));
+    }
+
+    @Override
+    public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> agg, 
DistinctSelectorContext ctx) {
+        Plan newChild = agg.child().accept(this, ctx);
+        agg = agg.withChildren(ImmutableList.of(newChild));
+        // process:
+        // count(distinct a,b);
+        // count(distinct a), sum(distinct a);
+        // count(distinct a)
+        // not process:
+        // count(distinct a,b), count(distinct a,c)
+        // count(distinct a), sum(distinct b)
+        if (agg.distinctFuncNum() < 2 || agg.getDistinctArguments().size() < 
2) {
+            return agg;
+        }
+        if (shouldUseMultiDistinct(agg)) {
+            return MultiDistinctFunctionStrategy.rewrite(agg);
+        } else {
+            return SplitMultiDistinctStrategy.rewrite(agg, ctx);
+        }
+    }
+
+    private boolean shouldUseMultiDistinct(LogicalAggregate<? extends Plan> 
agg) {
+        if (AggregateUtils.containsCountDistinctMultiExpr(agg)) {
+            return false;
+        }
+        if (ConnectContext.get().getSessionVariable().multiDistinctStrategy == 
1) {
+            return true;
+        } else if 
(ConnectContext.get().getSessionVariable().multiDistinctStrategy == 2) {
+            return false;
+        }

Review Comment:
   can you show the meaning of values of `multiDistinctStrategy`?
   please not compute 
`ConnectContext.get().getSessionVariable().multiDistinctStrategy`  twice



##########
fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java:
##########
@@ -299,6 +308,7 @@ public void testSelectFromWhereNoGroupBy() throws Throwable 
{
         Assertions.assertEquals(queryCacheParam5.tablet_to_range, 
queryCacheParam6.tablet_to_range);
     }
 
+    @Disabled

Review Comment:
   should not disable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to