924060929 commented on code in PR #54079:
URL: https://github.com/apache/doris/pull/54079#discussion_r2300232811
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckAfterRewrite.java:
##########
@@ -111,8 +110,8 @@ private void checkAllSlotReferenceFromChildren(Plan plan) {
notFromChildren = removeValidSlotsNotFromChildren(notFromChildren,
childrenOutput);
if (!notFromChildren.isEmpty()) {
if (plan.arity() != 0 && plan.child(0) instanceof
LogicalAggregate) {
- throw new AnalysisException(String.format("%s not in
aggregate's output", notFromChildren
-
.stream().map(NamedExpression::getName).collect(Collectors.joining(", "))));
+ // throw new AnalysisException(String.format("%s not in
aggregate's output", notFromChildren
+ //
.stream().map(NamedExpression::getName).collect(Collectors.joining(", "))));
Review Comment:
should not comment it?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java:
##########
@@ -302,161 +255,6 @@ public List<Rule> buildRules() {
LogicalFileScan fileScan = project.child();
return storageLayerAggregate(agg, project, fileScan,
ctx.cascadesContext);
})
Review Comment:
should move `COUNT_ON_INDEX_WITHOUT_PROJECT` to a new file
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {
Review Comment:
rename to `SplitAggWithoutDistinct`, and maybe you should use
OneImplementationRuleFactory? because you generate some physical plans
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java:
##########
@@ -505,24 +507,18 @@ public enum RuleType {
STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION),
COUNT_ON_INDEX_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
- ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
- TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-
TWO_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
-
THREE_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
TWO_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
- ONE_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI(RuleTypeClass.IMPLEMENTATION),
- TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI(RuleTypeClass.IMPLEMENTATION),
- TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT(RuleTypeClass.IMPLEMENTATION),
- THREE_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
- FOUR_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
-
FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE(RuleTypeClass.IMPLEMENTATION),
Review Comment:
You should skip unknown rules set in
`SessionVariable.getDisableNereidsRules` and so on, because user maybe set the
legacy rules in session variables
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhase.java:
##########
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.pattern.MatchingContext;
+import org.apache.doris.nereids.properties.ChildrenPropertiesRegulator;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Mod;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.XxHash32;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.SmallIntType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**SplitAggMultiPhase
+ * only process agg with distinct function, split Agg into multi phase
+ * */
+public class SplitAggMultiPhase extends SplitAggBaseRule implements
ExplorationRuleFactory {
+ public static final SplitAggMultiPhase INSTANCE = new SplitAggMultiPhase();
+ private static final String SALT_EXPR = "saltExpr";
+
+ @Override
+ public List<Rule> buildRules() {
+ return ImmutableList.of(
+ logicalAggregate()
+ .when(agg -> !agg.getGroupByExpressions().isEmpty())
+ .when(agg -> agg.getDistinctArguments().size() == 1 ||
agg.distinctFuncNum() == 1)
+ .thenApplyMulti(this::rewrite)
+ .toRule(RuleType.SPLIT_AGG_MULTI_PHASE)
+ );
+ }
+
+ private List<Plan> rewrite(MatchingContext<LogicalAggregate<GroupPlan>>
ctx) {
+ LogicalAggregate<? extends Plan> aggregate = ctx.root;
+ if (aggregate.canSkewRewrite()) {
+ return ImmutableList.of(aggSkewRewrite(aggregate,
ctx.cascadesContext));
+ } else {
+ if (twoPlusOneBetterThanTwoPlusTwo(aggregate)) {
+ return ImmutableList.<Plan>builder()
+ .addAll(splitToTwoPlusOnePhase(aggregate))
+ .addAll(splitToOnePlusTwoPhase(aggregate))
+ .build();
+ } else {
+ return ImmutableList.<Plan>builder()
+ .addAll(splitToTwoPlusTwoPhase(aggregate))
+ .build();
+ }
+ }
+ }
+
+ /**
+ * select count(distinct a) group by b (deduplicated agg hashShuffle by
group by key b)
+ * splitToTwoPlusOnePhase:
+ * agg(group by b, count(a); distinct global)
+ * +--agg(group by a,b; global)
+ * +--hashShuffle(b)
+ * +--agg(group by a,b; local)
+ * agg(group by b, count(a); distinct global)
+ * +--agg(group by a,b; global)
+ * +--hashShuffle(b)
+ */
+ private List<Plan> splitToTwoPlusOnePhase(LogicalAggregate<? extends Plan>
aggregate) {
+ ImmutableList.Builder<Plan> builder = ImmutableList.builder();
+ if (aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+ Set<NamedExpression> localAggGroupBySet =
AggregateUtils.getAllKeySet(aggregate);
+ Map<AggregateFunction, Alias> middleAggFuncToAlias = new
LinkedHashMap<>();
+ Plan middleAgg = splitDeduplicateTwoPhase(aggregate,
middleAggFuncToAlias,
+ aggregate.getGroupByExpressions(), localAggGroupBySet);
+ builder.add(splitDistinctOnePhase(aggregate, middleAggFuncToAlias,
middleAgg));
+ }
+ if (aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ Map<AggregateFunction, Alias> localAggFuncToAlias = new
HashMap<>();
+ Plan localAgg = splitToOnePhase(aggregate,
Utils.fastToImmutableList(aggregate.getGroupByExpressions()),
+ localAggFuncToAlias);
+ builder.add(splitDistinctOnePhase(aggregate, localAggFuncToAlias,
localAgg));
+ }
+ return builder.build();
+ }
+
+ private Plan splitToOnePhase(LogicalAggregate<? extends Plan> aggregate,
Review Comment:
add shape transformation comments
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggBaseRule.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**SplitAggRule*/
+public abstract class SplitAggBaseRule {
+ protected PhysicalHashAggregate<? extends Plan>
splitDeduplicateOnePhase(LogicalAggregate<? extends Plan> aggregate,
+ Set<NamedExpression> localAggGroupBySet, AggregateParam
inputToBufferParam, AggregateParam paramForAggFunc,
+ Map<AggregateFunction, Alias> localAggFunctionToAlias, Plan child,
List<Expression> partitionExpressions) {
+ aggregate.getAggregateFunctions().stream()
+ .filter(aggFunc -> !aggFunc.isDistinct())
+ .collect(Collectors.toMap(
+ expr -> expr,
+ expr -> {
+ AggregateExpression localAggExpr = new
AggregateExpression(expr, paramForAggFunc);
+ return new Alias(localAggExpr);
+ },
+ (existing, replacement) -> existing,
+ () -> localAggFunctionToAlias
+ ));
+ List<NamedExpression> localAggOutput =
ImmutableList.<NamedExpression>builder()
+ .addAll(localAggGroupBySet)
+ .addAll(localAggFunctionToAlias.values())
+ .build();
+ List<Expression> localAggGroupBy =
Utils.fastToImmutableList(localAggGroupBySet);
+ boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() &&
localAggOutput.isEmpty();
+ // be not recommend generate an aggregate node with empty group by and
empty output,
+ // so add a null int slot to group by slot and output
+ if (isGroupByEmptySelectEmpty) {
+ localAggGroupBy = ImmutableList.of(new
NullLiteral(TinyIntType.INSTANCE));
+ localAggOutput = ImmutableList.of(new Alias(new
NullLiteral(TinyIntType.INSTANCE)));
+ }
+ return new PhysicalHashAggregate<>(localAggGroupBy, localAggOutput,
Optional.ofNullable(partitionExpressions),
+ inputToBufferParam,
AggregateUtils.maybeUsingStreamAgg(localAggGroupBy, inputToBufferParam),
+ null, child);
+ }
+
+ protected PhysicalHashAggregate<? extends Plan>
splitDeduplicateTwoPhase(LogicalAggregate<? extends Plan> aggregate,
+ Map<AggregateFunction, Alias> middleAggFunctionToAlias,
List<Expression> partitionExpressions,
+ Set<NamedExpression> localAggGroupBySet) {
+ // first phase
+ AggregateParam inputToBufferParam = AggregateParam.LOCAL_BUFFER;
+ Map<AggregateFunction, Alias> localAggFunctionToAlias = new
LinkedHashMap<>();
+ PhysicalHashAggregate<? extends Plan> localAgg =
splitDeduplicateOnePhase(aggregate, localAggGroupBySet,
+ inputToBufferParam, inputToBufferParam,
localAggFunctionToAlias, aggregate.child(), ImmutableList.of());
+
+ // second phase
+ AggregateParam bufferToBufferParam = new
AggregateParam(AggPhase.GLOBAL, AggMode.BUFFER_TO_BUFFER);
+ localAggFunctionToAlias.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Entry::getKey,
+ kv -> {
+ AggregateExpression middleAggExpr = new
AggregateExpression(kv.getKey(),
+ bufferToBufferParam,
kv.getValue().toSlot());
+ return new Alias(middleAggExpr);
+ },
+ (existing, replacement) -> existing,
+ () -> middleAggFunctionToAlias));
+ List<NamedExpression> middleAggOutput =
ImmutableList.<NamedExpression>builder()
+ .addAll(localAggGroupBySet)
+ .addAll(middleAggFunctionToAlias.values())
+ .build();
+ if (middleAggOutput.isEmpty()) {
+ middleAggOutput = ImmutableList.of(new Alias(new
NullLiteral(TinyIntType.INSTANCE)));
+ }
+ return new PhysicalHashAggregate<>(localAgg.getGroupByExpressions(),
middleAggOutput,
+ Optional.ofNullable(partitionExpressions), bufferToBufferParam,
+
AggregateUtils.maybeUsingStreamAgg(localAgg.getGroupByExpressions(),
bufferToBufferParam),
+ null, localAgg);
+ }
+
+ protected PhysicalHashAggregate<? extends Plan>
splitDistinctTwoPhase(LogicalAggregate<? extends Plan> aggregate,
Review Comment:
add shape transformation comments
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggBaseRule.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**SplitAggRule*/
+public abstract class SplitAggBaseRule {
+ protected PhysicalHashAggregate<? extends Plan>
splitDeduplicateOnePhase(LogicalAggregate<? extends Plan> aggregate,
+ Set<NamedExpression> localAggGroupBySet, AggregateParam
inputToBufferParam, AggregateParam paramForAggFunc,
+ Map<AggregateFunction, Alias> localAggFunctionToAlias, Plan child,
List<Expression> partitionExpressions) {
+ aggregate.getAggregateFunctions().stream()
+ .filter(aggFunc -> !aggFunc.isDistinct())
+ .collect(Collectors.toMap(
+ expr -> expr,
+ expr -> {
+ AggregateExpression localAggExpr = new
AggregateExpression(expr, paramForAggFunc);
+ return new Alias(localAggExpr);
+ },
+ (existing, replacement) -> existing,
+ () -> localAggFunctionToAlias
+ ));
+ List<NamedExpression> localAggOutput =
ImmutableList.<NamedExpression>builder()
+ .addAll(localAggGroupBySet)
+ .addAll(localAggFunctionToAlias.values())
+ .build();
+ List<Expression> localAggGroupBy =
Utils.fastToImmutableList(localAggGroupBySet);
+ boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() &&
localAggOutput.isEmpty();
+ // be not recommend generate an aggregate node with empty group by and
empty output,
+ // so add a null int slot to group by slot and output
+ if (isGroupByEmptySelectEmpty) {
+ localAggGroupBy = ImmutableList.of(new
NullLiteral(TinyIntType.INSTANCE));
+ localAggOutput = ImmutableList.of(new Alias(new
NullLiteral(TinyIntType.INSTANCE)));
+ }
+ return new PhysicalHashAggregate<>(localAggGroupBy, localAggOutput,
Optional.ofNullable(partitionExpressions),
+ inputToBufferParam,
AggregateUtils.maybeUsingStreamAgg(localAggGroupBy, inputToBufferParam),
+ null, child);
+ }
+
+ protected PhysicalHashAggregate<? extends Plan>
splitDeduplicateTwoPhase(LogicalAggregate<? extends Plan> aggregate,
Review Comment:
add shape transformation comments
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhaseWithoutGbyKey.java:
##########
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.GroupConcat;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctGroupConcat;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum0;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**SplitAggMultiPhaseWithoutGbyKey*/
+public class SplitAggMultiPhaseWithoutGbyKey extends SplitAggBaseRule
implements ExplorationRuleFactory {
Review Comment:
rename to `SplitDistinctAggWithoutGbyKey`, maybe you should use
ImplementationRuleFactory?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -110,74 +112,141 @@ public List<List<PhysicalProperties>>
visitPhysicalHashAggregate(
if (agg.getGroupByExpressions().isEmpty() &&
agg.getOutputExpressions().isEmpty()) {
return ImmutableList.of();
}
+ // If the origin attribute satisfies the group by key but does not
meet the requirements, ban the plan.
+ // e.g. select count(distinct a) from t group by b;
+ // requiredChildProperty: a
+ // but the child is already distributed by b
+ // ban this plan
+ PhysicalProperties originChildProperty =
originChildrenProperties.get(0);
+ PhysicalProperties requiredChildProperty = requiredProperties.get(0);
+ PhysicalProperties hashSpec =
PhysicalProperties.createHash(agg.getGroupByExpressions(), ShuffleType.REQUIRE);
+ GroupExpression child = children.get(0);
+ if (child.getPlan() instanceof PhysicalDistribute) {
+ PhysicalProperties properties = new PhysicalProperties(
+ DistributionSpecAny.INSTANCE,
originChildProperty.getOrderSpec());
+ GroupExpression distributeChild =
child.getOwnerGroup().getLowestCostPlan(properties).get().second;
+ PhysicalProperties distributeChildProperties =
distributeChild.getOutputProperties(properties);
+ if (distributeChildProperties.satisfy(hashSpec)
+ &&
!distributeChildProperties.satisfy(requiredChildProperty)) {
+ return ImmutableList.of();
+ }
+ }
+
if (!agg.getAggregateParam().canBeBanned) {
return visit(agg, context);
}
- // forbid one phase agg on distribute
- if (agg.getAggMode() == AggMode.INPUT_TO_RESULT &&
children.get(0).getPlan() instanceof PhysicalDistribute) {
- // this means one stage gather agg, usually bad pattern
+ // return aggBanByStatistics(agg, context);
+ if (shouldBanOnePhaseAgg(agg, requiredChildProperty)) {
return ImmutableList.of();
}
+ // process must shuffle
+ return visit(agg, context);
+ }
- // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
- // TODO: this is forbid good plan after cte reuse by mistake
- if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER
- && requiredProperties.get(0).getDistributionSpec() instanceof
DistributionSpecHash
- && children.get(0).getPlan() instanceof PhysicalDistribute) {
- return ImmutableList.of();
+ /**
+ * 1. Generally, one-stage AGG is disabled unless the child of distribute
is a CTE consumer.
+ * 2. If it is a CTE consumer, to avoid being banned, ensure that
distribute is not a gather.
+ * Alternatively, if the distribute is a shuffle, ensure that the
shuffle expr is not skewed.
+ * */
+ private boolean shouldBanOnePhaseAgg(PhysicalHashAggregate<? extends Plan>
aggregate,
+ PhysicalProperties requiredChildProperty) {
+ if (banAggUnionAll(aggregate)) {
+ return true;
+ }
+ if (!onePhaseAggWithDistribute(aggregate)) {
+ return false;
}
+ if (childIsCTEConsumer()) {
+ // shape is agg-distribute-CTEConsumer
+ // distribute is gather
+ if (requireGather(requiredChildProperty)) {
+ return true;
+ }
+ // group by key is skew
+ if (skewOnShuffleExpr(aggregate)) {
+ return true;
+ }
+ return false;
- // agg(group by x)-union all(A, B)
- // no matter x.ndv is high or not, it is not worthwhile to shuffle A
and B by x
- // and hence we forbid one phase agg
- if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
- && children.get(0).getPlan() instanceof PhysicalUnion
- && !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) {
- return ImmutableList.of();
+ } else {
+ return true;
}
- // forbid multi distinct opt that bad than multi-stage version when
multi-stage can be executed in one fragment
- if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() ==
AggMode.INPUT_TO_RESULT) {
- List<MultiDistinction> multiDistinctions =
agg.getOutputExpressions().stream()
- .filter(Alias.class::isInstance)
- .map(a -> ((Alias) a).child())
- .filter(AggregateExpression.class::isInstance)
- .map(a -> ((AggregateExpression) a).getFunction())
- .filter(MultiDistinction.class::isInstance)
- .map(MultiDistinction.class::cast)
- .collect(Collectors.toList());
- if (multiDistinctions.size() == 1) {
- Expression distinctChild = multiDistinctions.get(0).child(0);
- DistributionSpec childDistribution =
originChildrenProperties.get(0).getDistributionSpec();
- if (distinctChild instanceof SlotReference &&
childDistribution instanceof DistributionSpecHash) {
- SlotReference slotReference = (SlotReference)
distinctChild;
- DistributionSpecHash distributionSpecHash =
(DistributionSpecHash) childDistribution;
- List<ExprId> groupByColumns =
agg.getGroupByExpressions().stream()
- .map(SlotReference.class::cast)
- .map(SlotReference::getExprId)
- .collect(Collectors.toList());
- DistributionSpecHash groupByRequire = new
DistributionSpecHash(
- groupByColumns, ShuffleType.REQUIRE);
- List<ExprId> distinctChildColumns =
Lists.newArrayList(slotReference.getExprId());
- distinctChildColumns.add(slotReference.getExprId());
- DistributionSpecHash distinctChildRequire = new
DistributionSpecHash(
- distinctChildColumns, ShuffleType.REQUIRE);
- if ((!groupByColumns.isEmpty() &&
distributionSpecHash.satisfy(groupByRequire))
- || (groupByColumns.isEmpty() &&
distributionSpecHash.satisfy(distinctChildRequire))) {
- if (!agg.mustUseMultiDistinctAgg()) {
- return ImmutableList.of();
- }
- }
- }
- // if distinct without group by key, we prefer three or four
stage distinct agg
- // because the second phase of multi-distinct only have one
instance, and it is slow generally.
- if (agg.getOutputExpressions().size() == 1 &&
agg.getGroupByExpressions().isEmpty()
- && !agg.mustUseMultiDistinctAgg()) {
- return ImmutableList.of();
- }
+ }
+
+ private boolean skewOnShuffleExpr(PhysicalHashAggregate<? extends Plan>
agg) {
+ // if statistic is unknown -> not skew
+ Statistics aggStatistics =
agg.getGroupExpression().get().getOwnerGroup().getStatistics();
+ Statistics inputStatistics =
agg.getGroupExpression().get().childStatistics(0);
+ if (aggStatistics == null || inputStatistics == null) {
+ return false;
+ }
+ if (AggregateUtils.hasUnknownStatistics(agg.getGroupByExpressions(),
inputStatistics)) {
+ return false;
+ }
+ // There are two cases of skew:
+ double gbyNdv = aggStatistics.getRowCount();
+ // 1. ndv is very low
+ if (gbyNdv < LOW_NDV_THRESHOLD) {
+ return true;
+ }
+ // 2. There is a hot value, and the ndv of other keys is very low
+ if (isSkew(agg.getGroupByExpressions(), inputStatistics)) {
+ return true;
+ }
+ return false;
+ }
+
+ // if one group by key has hot value, and others ndv is low -> skew
+ private boolean isSkew(List<Expression> groupBy, Statistics
inputStatistics) {
+ for (int i = 0; i < groupBy.size(); ++i) {
+ Expression expr = groupBy.get(i);
+ ColumnStatistic colStat =
inputStatistics.findColumnStatistics(expr);
+ if (colStat == null) {
+ continue;
+ }
+ if (colStat.getHotValues() == null) {
+ continue;
+ }
+ List<Expression> otherExpr = excludeElement(groupBy, i);
+ double otherNdv =
StatsCalculator.estimateGroupByRowCount(otherExpr, inputStatistics);
+ if (otherNdv < LOW_NDV_THRESHOLD) {
+ return true;
}
}
- // process must shuffle
- return visit(agg, context);
+ return false;
+ }
+
+ private static <T> List<T> excludeElement(List<T> list, int index) {
+ List<T> newList = new ArrayList<>();
+ for (int i = 0; i < list.size(); i++) {
+ if (index != i) {
+ newList.add(list.get(i));
+ }
+ }
+ return newList;
+ }
+
+ private boolean onePhaseAggWithDistribute(PhysicalHashAggregate<? extends
Plan> aggregate) {
+ return aggregate.getAggMode() == AggMode.INPUT_TO_RESULT
+ && children.get(0).getPlan() instanceof PhysicalDistribute;
+ }
+
+ private boolean childIsCTEConsumer() {
+ return
children.get(0).children().get(0).getPhysicalExpressions().get(0).getPlan()
+ instanceof PhysicalCTEConsumer;
Review Comment:
maybe throw NPE?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctStrategy.java:
##########
@@ -211,38 +160,28 @@ private static boolean
isDistinctMultiColumns(AggregateFunction func) {
return false;
}
- private static boolean needTransform(LogicalAggregate<Plan> agg,
List<Alias> aliases, List<Alias> otherAggFuncs) {
+ private static void collectInfo(LogicalAggregate<? extends Plan> agg,
List<List<Alias>> aliases,
Review Comment:
rename to collectDistinctAndNonDistinctFunctions
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -1111,6 +1112,8 @@ public PlanFragment visitPhysicalHashAggregate(
// 2. collect agg expressions and generate agg function to slot
reference map
List<Slot> aggFunctionOutput = Lists.newArrayList();
ArrayList<FunctionCallExpr> execAggregateFunctions =
Lists.newArrayListWithCapacity(outputExpressions.size());
+ boolean isPartial =
aggregate.getAggregateParam().aggMode.productAggregateBuffer;
Review Comment:
@feiniaofeiafei why aggMode isPartial but contains partial aggregation
function
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {
+ public static final SplitAgg INSTANCE = new SplitAgg();
+
+ @Override
+ public Rule build() {
+ return logicalAggregate()
+ .whenNot(Aggregate::hasDistinctFunc)
+ .thenApplyMulti(ctx -> rewrite(ctx.root))
+ .toRule(RuleType.SPLIT_AGG);
+ }
+
+ private List<Plan> rewrite(LogicalAggregate<? extends Plan> aggregate) {
+ return ImmutableList.<Plan>builder()
+ .addAll(splitTwoPhase(aggregate))
+ .addAll(implementOnePhase(aggregate))
+ .build();
+ }
+
+ private List<Plan> implementOnePhase(LogicalAggregate<? extends Plan>
logicalAgg) {
Review Comment:
add some comments which show the transformation of plan shape like
`AggregateStrategies.onePhaseAggregateWithMultiDistinct`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctStrategy.java:
##########
@@ -211,38 +160,28 @@ private static boolean
isDistinctMultiColumns(AggregateFunction func) {
return false;
}
Review Comment:
this function not be used? delete it
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggBaseRule.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**SplitAggRule*/
+public abstract class SplitAggBaseRule {
+ protected PhysicalHashAggregate<? extends Plan>
splitDeduplicateOnePhase(LogicalAggregate<? extends Plan> aggregate,
Review Comment:
add shape transformation comments
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {
+ public static final SplitAgg INSTANCE = new SplitAgg();
+
+ @Override
+ public Rule build() {
+ return logicalAggregate()
+ .whenNot(Aggregate::hasDistinctFunc)
+ .thenApplyMulti(ctx -> rewrite(ctx.root))
+ .toRule(RuleType.SPLIT_AGG);
+ }
+
+ private List<Plan> rewrite(LogicalAggregate<? extends Plan> aggregate) {
+ return ImmutableList.<Plan>builder()
+ .addAll(splitTwoPhase(aggregate))
+ .addAll(implementOnePhase(aggregate))
+ .build();
+ }
+
+ private List<Plan> implementOnePhase(LogicalAggregate<? extends Plan>
logicalAgg) {
+ if (!logicalAgg.supportAggregatePhase(AggregatePhase.ONE)) {
+ return ImmutableList.of();
+ }
+ ImmutableList.Builder<NamedExpression> builder =
ImmutableList.builder();
+ boolean changed = false;
+ for (NamedExpression expr : logicalAgg.getOutputExpressions()) {
+ if (expr instanceof Alias && expr.child(0) instanceof
AggregateFunction) {
+ Alias alias = (Alias) expr;
+ AggregateExpression aggExpr = new
AggregateExpression((AggregateFunction) expr.child(0),
+ AggregateParam.GLOBAL_RESULT);
+ builder.add(alias.withChildren(ImmutableList.of(aggExpr)));
+ changed = true;
+ } else {
+ builder.add(expr);
+ }
+ }
+ AggregateParam param = new AggregateParam(AggPhase.GLOBAL,
AggMode.INPUT_TO_RESULT, !skipRegulator(logicalAgg));
+ List<NamedExpression> aggOutput = changed ? builder.build() :
logicalAgg.getOutputExpressions();
+ return ImmutableList.of(new
PhysicalHashAggregate<>(logicalAgg.getGroupByExpressions(), aggOutput, param,
+
AggregateUtils.maybeUsingStreamAgg(logicalAgg.getGroupByExpressions(), param),
+ null, logicalAgg.child()));
+ }
+
+ private List<Plan> splitTwoPhase(LogicalAggregate<? extends Plan>
aggregate) {
Review Comment:
add some comments which show the transformation of plan shape
##########
fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java:
##########
@@ -322,6 +332,7 @@ public void testSelectFromGroupBy() {
});
}
+ @Disabled
Review Comment:
should not disable
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhase.java:
##########
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.pattern.MatchingContext;
+import org.apache.doris.nereids.properties.ChildrenPropertiesRegulator;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Mod;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.XxHash32;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.SmallIntType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**SplitAggMultiPhase
+ * only process agg with distinct function, split Agg into multi phase
+ * */
+public class SplitAggMultiPhase extends SplitAggBaseRule implements
ExplorationRuleFactory {
+ public static final SplitAggMultiPhase INSTANCE = new SplitAggMultiPhase();
+ private static final String SALT_EXPR = "saltExpr";
+
+ @Override
+ public List<Rule> buildRules() {
+ return ImmutableList.of(
+ logicalAggregate()
+ .when(agg -> !agg.getGroupByExpressions().isEmpty())
+ .when(agg -> agg.getDistinctArguments().size() == 1 ||
agg.distinctFuncNum() == 1)
+ .thenApplyMulti(this::rewrite)
+ .toRule(RuleType.SPLIT_AGG_MULTI_PHASE)
+ );
+ }
+
+ private List<Plan> rewrite(MatchingContext<LogicalAggregate<GroupPlan>>
ctx) {
+ LogicalAggregate<? extends Plan> aggregate = ctx.root;
+ if (aggregate.canSkewRewrite()) {
+ return ImmutableList.of(aggSkewRewrite(aggregate,
ctx.cascadesContext));
+ } else {
+ if (twoPlusOneBetterThanTwoPlusTwo(aggregate)) {
+ return ImmutableList.<Plan>builder()
+ .addAll(splitToTwoPlusOnePhase(aggregate))
+ .addAll(splitToOnePlusTwoPhase(aggregate))
+ .build();
+ } else {
+ return ImmutableList.<Plan>builder()
+ .addAll(splitToTwoPlusTwoPhase(aggregate))
+ .build();
+ }
+ }
+ }
+
+ /**
+ * select count(distinct a) group by b (deduplicated agg hashShuffle by
group by key b)
+ * splitToTwoPlusOnePhase:
+ * agg(group by b, count(a); distinct global)
+ * +--agg(group by a,b; global)
+ * +--hashShuffle(b)
+ * +--agg(group by a,b; local)
+ * agg(group by b, count(a); distinct global)
+ * +--agg(group by a,b; global)
+ * +--hashShuffle(b)
+ */
+ private List<Plan> splitToTwoPlusOnePhase(LogicalAggregate<? extends Plan>
aggregate) {
+ ImmutableList.Builder<Plan> builder = ImmutableList.builder();
+ if (aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+ Set<NamedExpression> localAggGroupBySet =
AggregateUtils.getAllKeySet(aggregate);
+ Map<AggregateFunction, Alias> middleAggFuncToAlias = new
LinkedHashMap<>();
+ Plan middleAgg = splitDeduplicateTwoPhase(aggregate,
middleAggFuncToAlias,
+ aggregate.getGroupByExpressions(), localAggGroupBySet);
+ builder.add(splitDistinctOnePhase(aggregate, middleAggFuncToAlias,
middleAgg));
+ }
+ if (aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ Map<AggregateFunction, Alias> localAggFuncToAlias = new
HashMap<>();
+ Plan localAgg = splitToOnePhase(aggregate,
Utils.fastToImmutableList(aggregate.getGroupByExpressions()),
+ localAggFuncToAlias);
+ builder.add(splitDistinctOnePhase(aggregate, localAggFuncToAlias,
localAgg));
+ }
+ return builder.build();
+ }
+
+ private Plan splitToOnePhase(LogicalAggregate<? extends Plan> aggregate,
+ List<Expression> partitionExpressions, Map<AggregateFunction,
Alias> localAggFunctionToAlias) {
+ Set<NamedExpression> localAggGroupBySet =
AggregateUtils.getAllKeySet(aggregate);
+ // first phase
+ AggregateParam inputToResultParamFirst = new
AggregateParam(AggPhase.GLOBAL, AggMode.INPUT_TO_RESULT);
+ AggregateParam paramForAggFunc = new AggregateParam(AggPhase.GLOBAL,
AggMode.INPUT_TO_BUFFER);
+ return splitDeduplicateOnePhase(aggregate, localAggGroupBySet,
inputToResultParamFirst,
+ paramForAggFunc, localAggFunctionToAlias, aggregate.child(),
+ partitionExpressions);
+ }
+
+ /**
+ * select count(distinct a) group by b (deduplicated agg hashShuffle by
group by key b)
+ * splitToTwoPlusTwoPhase:
+ * agg(group by b, count(a); distinct global)
+ * +--hashShuffle(b)
+ * +--agg(group by b, count(a); distinct local)
+ * +--agg(group by a,b; global)
+ * +--hashShuffle(a,b)
+ * +--agg(group by a,b; local)
+ * agg(group by b, count(a); distinct global)
+ * +--hashShuffle(b)
+ * +--agg(group by b, count(a); distinct local)
+ * +--agg(group by a,b; global)
+ * +--hashShuffle(a,b)
+ */
+ private List<Plan> splitToTwoPlusTwoPhase(LogicalAggregate<? extends Plan>
aggregate) {
+ ImmutableList.Builder<Plan> builder = ImmutableList.builder();
+ Set<NamedExpression> localAggGroupBySet =
AggregateUtils.getAllKeySet(aggregate);
+ if (aggregate.supportAggregatePhase(AggregatePhase.FOUR)) {
+ Map<AggregateFunction, Alias> middleAggFunctionToAlias = new
LinkedHashMap<>();
+ Plan twoPhaseAgg = splitDeduplicateTwoPhase(aggregate,
middleAggFunctionToAlias,
+ Utils.fastToImmutableList(localAggGroupBySet),
localAggGroupBySet);
+ builder.add(splitDistinctTwoPhase(aggregate,
middleAggFunctionToAlias, twoPhaseAgg));
+ }
+ Map<AggregateFunction, Alias> localAggFunctionToAlias = new
HashMap<>();
+ Plan onePhaseAgg = splitToOnePhase(aggregate,
Utils.fastToImmutableList(localAggGroupBySet),
+ localAggFunctionToAlias);
+ if (aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+ builder.add(splitDistinctTwoPhase(aggregate,
localAggFunctionToAlias, onePhaseAgg));
+ }
+ if (aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ builder.add(splitDistinctOnePhase(aggregate,
localAggFunctionToAlias, onePhaseAgg));
+ }
+ return builder.build();
+ }
+
+ /**
+ * select count(distinct a) group by b (deduplicated agg hashShuffle by
group by key b)
+ * splitToOnePlusTwoPhase: (deduplicated agg hashShuffle by distinct key a)
+ * agg(group by b, count(a); distinct global)
+ * +--hashShuffle(b)
+ * +--agg(group by b, count(a); distinct local)
+ * +--agg(group by a,b; global)
+ * +--hashShuffle(a)
+ */
+ private List<Plan> splitToOnePlusTwoPhase(LogicalAggregate<? extends Plan>
aggregate) {
+ if (!aggregate.supportAggregatePhase(AggregatePhase.THREE)) {
+ return ImmutableList.of();
+ }
+ Set<NamedExpression> localAggGroupBySet =
AggregateUtils.getAllKeySet(aggregate);
+ // first phase
+ AggregateParam paramForAgg = new AggregateParam(AggPhase.GLOBAL,
AggMode.INPUT_TO_RESULT);
+ AggregateParam paramForAggFunc = new AggregateParam(AggPhase.GLOBAL,
AggMode.INPUT_TO_BUFFER);
+
+ Map<AggregateFunction, Alias> localAggFunctionToAlias = new
LinkedHashMap<>();
+ Plan localAgg = splitDeduplicateOnePhase(aggregate,
localAggGroupBySet, paramForAgg, paramForAggFunc,
+ localAggFunctionToAlias, aggregate.child(),
+ Utils.fastToImmutableList(aggregate.getDistinctArguments()));
+ return ImmutableList.of(splitDistinctTwoPhase(aggregate,
localAggFunctionToAlias, localAgg));
+ }
+
+ private PhysicalHashAggregate<? extends Plan>
splitDistinctOnePhase(LogicalAggregate<? extends Plan> aggregate,
Review Comment:
add shape transformation comments
##########
fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java:
##########
@@ -335,6 +346,7 @@ public void phasesAgg() {
Assertions.assertNotEquals(onePhaseAggPlans, twoPhaseAggPlans);
}
+ @Disabled
Review Comment:
should not disable
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhase.java:
##########
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.pattern.MatchingContext;
+import org.apache.doris.nereids.properties.ChildrenPropertiesRegulator;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Mod;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.XxHash32;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.SmallIntType;
+import org.apache.doris.nereids.types.StringType;
+import org.apache.doris.nereids.types.TinyIntType;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**SplitAggMultiPhase
+ * only process agg with distinct function, split Agg into multi phase
+ * */
+public class SplitAggMultiPhase extends SplitAggBaseRule implements
ExplorationRuleFactory {
Review Comment:
rename to `SplitDistinctAggWithGbyKey`, maybe you should use
ImplementationRuleFactory?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java:
##########
@@ -216,7 +216,8 @@ public void execute() {
// if break when running the loop above, the condition must be
false.
if (curChildIndex == groupExpression.arity()) {
if (!calculateEnforce(requestChildrenProperties,
outputChildrenProperties)) {
- return; // if error exists, return
+ clear();
Review Comment:
+1
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAgg.java:
##########
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory;
+import org.apache.doris.nereids.trees.expressions.AggregateExpression;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AggregatePhase;
+import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.AggPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**SplitAgg
+ * only process agg without distinct function, split Agg into 2 phase: local
agg and global agg
+ * */
+public class SplitAgg extends OneExplorationRuleFactory {
+ public static final SplitAgg INSTANCE = new SplitAgg();
+
+ @Override
+ public Rule build() {
+ return logicalAggregate()
+ .whenNot(Aggregate::hasDistinctFunc)
+ .thenApplyMulti(ctx -> rewrite(ctx.root))
+ .toRule(RuleType.SPLIT_AGG);
+ }
+
+ private List<Plan> rewrite(LogicalAggregate<? extends Plan> aggregate) {
+ return ImmutableList.<Plan>builder()
+ .addAll(splitTwoPhase(aggregate))
+ .addAll(implementOnePhase(aggregate))
+ .build();
+ }
+
+ private List<Plan> implementOnePhase(LogicalAggregate<? extends Plan>
logicalAgg) {
+ if (!logicalAgg.supportAggregatePhase(AggregatePhase.ONE)) {
+ return ImmutableList.of();
+ }
+ ImmutableList.Builder<NamedExpression> builder =
ImmutableList.builder();
+ boolean changed = false;
+ for (NamedExpression expr : logicalAgg.getOutputExpressions()) {
+ if (expr instanceof Alias && expr.child(0) instanceof
AggregateFunction) {
+ Alias alias = (Alias) expr;
+ AggregateExpression aggExpr = new
AggregateExpression((AggregateFunction) expr.child(0),
+ AggregateParam.GLOBAL_RESULT);
+ builder.add(alias.withChildren(ImmutableList.of(aggExpr)));
+ changed = true;
+ } else {
+ builder.add(expr);
+ }
+ }
+ AggregateParam param = new AggregateParam(AggPhase.GLOBAL,
AggMode.INPUT_TO_RESULT, !skipRegulator(logicalAgg));
+ List<NamedExpression> aggOutput = changed ? builder.build() :
logicalAgg.getOutputExpressions();
+ return ImmutableList.of(new
PhysicalHashAggregate<>(logicalAgg.getGroupByExpressions(), aggOutput, param,
+
AggregateUtils.maybeUsingStreamAgg(logicalAgg.getGroupByExpressions(), param),
+ null, logicalAgg.child()));
+ }
+
+ private List<Plan> splitTwoPhase(LogicalAggregate<? extends Plan>
aggregate) {
+ if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ return ImmutableList.of();
+ }
+ AggregateParam inputToBufferParam = new AggregateParam(AggPhase.LOCAL,
AggMode.INPUT_TO_BUFFER);
+ Map<AggregateFunction, Alias> aggFunctionToAlias =
aggregate.getAggregateFunctions().stream()
+ .collect(ImmutableMap.toImmutableMap(function -> function,
function -> {
+ AggregateExpression localAggFunc = new
AggregateExpression(function, inputToBufferParam);
+ return new Alias(localAggFunc);
+ }));
+ List<NamedExpression> localAggOutput =
ImmutableList.<NamedExpression>builder()
+ .addAll((List) aggregate.getGroupByExpressions())
+ .addAll(aggFunctionToAlias.values())
+ .build();
+
+ PhysicalHashAggregate<? extends Plan> localAgg = new
PhysicalHashAggregate<>(aggregate.getGroupByExpressions(),
+ localAggOutput, inputToBufferParam,
+
AggregateUtils.maybeUsingStreamAgg(aggregate.getGroupByExpressions(),
inputToBufferParam),
+ null, aggregate.child());
+
+ // global agg
+ AggregateParam bufferToResultParam = new
AggregateParam(AggPhase.GLOBAL, AggMode.BUFFER_TO_RESULT);
+ List<NamedExpression> globalAggOutput =
ExpressionUtils.rewriteDownShortCircuit(
+ aggregate.getOutputExpressions(), expr -> {
+ if (!(expr instanceof AggregateFunction)) {
+ return expr;
+ }
+ Alias alias = aggFunctionToAlias.get(expr);
+ if (alias == null) {
+ return expr;
+ }
+ AggregateFunction aggFunc = (AggregateFunction) expr;
+ return new AggregateExpression(aggFunc,
bufferToResultParam, alias.toSlot());
+ });
+ return ImmutableList.of(new
PhysicalHashAggregate<>(aggregate.getGroupByExpressions(),
+ globalAggOutput, bufferToResultParam,
+
AggregateUtils.maybeUsingStreamAgg(aggregate.getGroupByExpressions(),
bufferToResultParam),
+ aggregate.getLogicalProperties(), localAgg));
+ }
+
+ private boolean shouldUseLocalAgg(LogicalAggregate<? extends Plan>
aggregate) {
+ Statistics aggStats =
aggregate.getGroupExpression().get().getOwnerGroup().getStatistics();
+ Statistics aggChildStats =
aggregate.getGroupExpression().get().childStatistics(0);
+ // if gbyNdv is high, should not use local agg
+ double rows = aggChildStats.getRowCount();
+ double gbyNdv = aggStats.getRowCount();
+ return gbyNdv * 10 < rows;
+ }
Review Comment:
not used, delete this function
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggStrategySelector.java:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import
org.apache.doris.nereids.rules.rewrite.DistinctAggStrategySelector.DistinctSelectorContext;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.AggregateUtils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Chooses the optimal execution strategy for queries with multiple DISTINCT
aggregations.
+ *
+ * Handles queries like "SELECT COUNT(DISTINCT c1), COUNT(DISTINCT c2) FROM t"
by selecting between:
+ * - CTE decomposition: Splits into multiple CTEs, each computing one DISTINCT
aggregate
+ * - Multi-DISTINCT function: Processes all distinct function use multi
distinct function
+ *
+ * Selection criteria includes:
+ * - Number of distinct aggregates
+ * - Estimated cardinality of distinct values
+ * - Available memory resources
+ * - Query complexity
+ */
+public class DistinctAggStrategySelector extends
DefaultPlanRewriter<DistinctSelectorContext>
+ implements CustomRewriter {
+ public static DistinctAggStrategySelector INSTANCE = new
DistinctAggStrategySelector();
+
+ /**DistinctSplitContext*/
+ public static class DistinctSelectorContext {
+ List<LogicalCTEProducer<? extends Plan>> cteProducerList;
+ StatementContext statementContext;
+ CascadesContext cascadesContext;
+
+ public DistinctSelectorContext(StatementContext statementContext,
CascadesContext cascadesContext) {
+ this.statementContext = statementContext;
+ this.cteProducerList = new ArrayList<>();
+ this.cascadesContext = cascadesContext;
+ }
+ }
+
+ @Override
+ public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+ DistinctSelectorContext ctx = new DistinctSelectorContext(
+ jobContext.getCascadesContext().getStatementContext(),
jobContext.getCascadesContext());
+ plan = plan.accept(this, ctx);
+ for (int i = ctx.cteProducerList.size() - 1; i >= 0; i--) {
+ LogicalCTEProducer<? extends Plan> producer =
ctx.cteProducerList.get(i);
+ plan = new LogicalCTEAnchor<>(producer.getCteId(), producer, plan);
+ }
+ return plan;
+ }
+
+ @Override
+ public Plan visitLogicalCTEAnchor(
+ LogicalCTEAnchor<? extends Plan, ? extends Plan> anchor,
DistinctSelectorContext ctx) {
+ Plan child1 = anchor.child(0).accept(this, ctx);
+ DistinctSelectorContext consumerContext =
+ new DistinctSelectorContext(ctx.statementContext,
ctx.cascadesContext);
+ Plan child2 = anchor.child(1).accept(this, consumerContext);
+ for (int i = consumerContext.cteProducerList.size() - 1; i >= 0; i--) {
+ LogicalCTEProducer<? extends Plan> producer =
consumerContext.cteProducerList.get(i);
+ child2 = new LogicalCTEAnchor<>(producer.getCteId(), producer,
child2);
+ }
+ return anchor.withChildren(ImmutableList.of(child1, child2));
+ }
+
+ @Override
+ public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> agg,
DistinctSelectorContext ctx) {
+ Plan newChild = agg.child().accept(this, ctx);
+ agg = agg.withChildren(ImmutableList.of(newChild));
+ // process:
+ // count(distinct a,b);
+ // count(distinct a), sum(distinct a);
+ // count(distinct a)
+ // not process:
+ // count(distinct a,b), count(distinct a,c)
+ // count(distinct a), sum(distinct b)
+ if (agg.distinctFuncNum() < 2 || agg.getDistinctArguments().size() <
2) {
+ return agg;
+ }
+ if (shouldUseMultiDistinct(agg)) {
+ return MultiDistinctFunctionStrategy.rewrite(agg);
+ } else {
+ return SplitMultiDistinctStrategy.rewrite(agg, ctx);
+ }
+ }
+
+ private boolean shouldUseMultiDistinct(LogicalAggregate<? extends Plan>
agg) {
+ if (AggregateUtils.containsCountDistinctMultiExpr(agg)) {
+ return false;
+ }
+ if (ConnectContext.get().getSessionVariable().multiDistinctStrategy ==
1) {
+ return true;
+ } else if
(ConnectContext.get().getSessionVariable().multiDistinctStrategy == 2) {
+ return false;
+ }
Review Comment:
can you show the meaning of values of `multiDistinctStrategy`?
please not compute
`ConnectContext.get().getSessionVariable().multiDistinctStrategy` twice
##########
fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java:
##########
@@ -299,6 +308,7 @@ public void testSelectFromWhereNoGroupBy() throws Throwable
{
Assertions.assertEquals(queryCacheParam5.tablet_to_range,
queryCacheParam6.tablet_to_range);
}
+ @Disabled
Review Comment:
should not disable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]