This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 61964b75ebfe775de724fd412be364d4458970ac Author: feiniaofeiafei <[email protected]> AuthorDate: Fri Mar 13 19:47:47 2026 +0800 branch-4.1:(fix)(agg) Adjust agg strategy when table satisfy distinct key distribution #61248 (#61297) picked from #61248 --- .../rules/rewrite/DistinctAggregateRewriter.java | 130 +++++++++-- .../rewrite/DistinctAggregateRewriterTest.java | 116 ++++++++++ .../agg_skew_rewrite/agg_skew_rewrite.out | 6 +- .../nereids_rules_p0/agg_strategy/agg_strategy.out | 240 ++++++++++----------- .../agg_strategy/distinct_agg_rewriter.out | 12 +- .../distinct_agg_strategy_selector.out | 14 +- .../agg_strategy/physical_agg_regulator.out | 14 +- .../distinct_split/disitinct_split.out | 43 ++-- .../data/shape_check/clickbench/query10.out | 9 +- .../data/shape_check/clickbench/query11.out | 7 +- .../data/shape_check/clickbench/query12.out | 7 +- .../data/shape_check/clickbench/query14.out | 7 +- .../data/shape_check/clickbench/query23.out | 7 +- .../data/shape_check/clickbench/query9.out | 5 +- .../agg_strategy/agg_strategy.groovy | 1 + .../agg_strategy/distinct_agg_rewriter.groovy | 1 + 16 files changed, 402 insertions(+), 217 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java index da0aa21308b..c2668751dae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriter.java @@ -17,13 +17,18 @@ package org.apache.doris.nereids.rules.rewrite; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.StatsDerive.DeriveContext; -import org.apache.doris.nereids.stats.ExpressionEstimation; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue; import org.apache.doris.nereids.trees.expressions.functions.agg.Count; @@ -34,11 +39,13 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.util.AggregateUtils; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Statistics; import com.google.common.annotations.VisibleForTesting; @@ -70,6 +77,7 @@ import java.util.Set; */ public class DistinctAggregateRewriter implements RewriteRuleFactory { public static final DistinctAggregateRewriter INSTANCE = new DistinctAggregateRewriter(); + private static final double MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD = 30.0; // TODO: add other functions private static final Set<Class<? extends AggregateFunction>> supportSplitOtherFunctions = ImmutableSet.of( Sum.class, Min.class, Max.class, Count.class, Sum0.class, AnyValue.class); @@ -113,21 +121,115 @@ public class DistinctAggregateRewriter implements RewriteRuleFactory { // has unknown statistics, split to bottom and top agg if (AggregateUtils.hasUnknownStatistics(aggregate.getGroupByExpressions(), aggChildStats) || AggregateUtils.hasUnknownStatistics(dstArgs, aggChildStats)) { - return true; + return !isDistinctKeySatisfyDistribution(aggregate); } double gbyNdv = aggStats.getRowCount(); - Expression dstKey = dstArgs.iterator().next(); - ColumnStatistic dstKeyStats = aggChildStats.findColumnStatistics(dstKey); - if (dstKeyStats == null) { - dstKeyStats = ExpressionEstimation.estimate(dstKey, aggChildStats); - } - double dstNdv = dstKeyStats.ndv; - double inputRows = aggChildStats.getRowCount(); - // group by key ndv is low, distinct key ndv is high, multi_distinct is better - // otherwise split to bottom and top agg - return gbyNdv < inputRows * AggregateUtils.LOW_CARDINALITY_THRESHOLD - && dstNdv > inputRows * AggregateUtils.HIGH_CARDINALITY_THRESHOLD; + int instanceNum = getParallelExecInstanceNum(ConnectContext.get()); + if (instanceNum <= 0) { + instanceNum = 1; + } + return gbyNdv / instanceNum <= MULTI_DISTINCT_GBY_PER_INSTANCE_THRESHOLD; + } + + private int getParallelExecInstanceNum(ConnectContext ctx) { + if (ctx == null) { + return 1; + } + return ctx.getSessionVariable() + .getParallelExecInstanceNum(ctx.getSessionVariable().resolveCloudClusterName(ctx)); + } + + private boolean isDistinctKeySatisfyDistribution(LogicalAggregate<? extends Plan> aggregate) { + DistinctDistributionInfo info = resolveDistinctDistributionInfo(aggregate); + if (info == null) { + return false; + } + Set<String> distinctColumnNames = new HashSet<>(); + for (SlotReference slot : info.distinctSlots) { + if (!slot.getOriginalColumn().isPresent()) { + return false; + } + distinctColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase()); + } + DistributionInfo distributionInfo = info.table.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return false; + } + List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + if (distributionColumns.isEmpty()) { + return false; + } + for (Column column : distributionColumns) { + if (!distinctColumnNames.contains(column.getName().toLowerCase())) { + return false; + } + } + return true; + } + + private DistinctDistributionInfo resolveDistinctDistributionInfo(LogicalAggregate<? extends Plan> aggregate) { + Set<Expression> distinctArgs = aggregate.getDistinctArguments(); + if (distinctArgs.isEmpty()) { + return null; + } + Set<SlotReference> distinctSlots = new HashSet<>(); + for (Expression expression : distinctArgs) { + if (!(expression instanceof SlotReference)) { + return null; + } + distinctSlots.add((SlotReference) expression); + } + Plan child = aggregate.child(); + while (child instanceof LogicalProject || child instanceof LogicalFilter) { + if (child instanceof LogicalProject) { + LogicalProject<? extends Plan> project = (LogicalProject<? extends Plan>) child; + Map<Slot, Expression> projectExprMap = new HashMap<>(); + for (NamedExpression namedExpression : project.getProjects()) { + Expression projectExpr = namedExpression; + if (namedExpression instanceof Alias) { + projectExpr = ((Alias) namedExpression).child(); + } + projectExprMap.put(namedExpression.toSlot(), projectExpr); + } + Set<SlotReference> replaced = new HashSet<>(); + for (SlotReference slot : distinctSlots) { + Expression projectExpr = projectExprMap.get(slot); + if (!(projectExpr instanceof SlotReference)) { + return null; + } + replaced.add((SlotReference) projectExpr); + } + distinctSlots = replaced; + child = project.child(); + continue; + } + child = ((LogicalFilter<? extends Plan>) child).child(); + } + if (!(child instanceof LogicalOlapScan)) { + return null; + } + OlapTable olapTable = ((LogicalOlapScan) child).getTable(); + if (olapTable == null) { + return null; + } + for (SlotReference slot : distinctSlots) { + if (!slot.getOriginalTable().isPresent() + || slot.getOriginalTable().get() != olapTable) { + return null; + } + } + return new DistinctDistributionInfo(olapTable, distinctSlots); + } + + private static class DistinctDistributionInfo { + private final OlapTable table; + private final Set<SlotReference> distinctSlots; + + private DistinctDistributionInfo(OlapTable table, Set<SlotReference> distinctSlots) { + this.table = table; + this.distinctSlots = distinctSlots; + } } private Plan rewrite(LogicalAggregate<? extends Plan> aggregate, ConnectContext ctx) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java index fcab4e2ae8c..05c2180840c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctAggregateRewriterTest.java @@ -17,22 +17,34 @@ package org.apache.doris.nereids.rules.rewrite; +import org.apache.doris.nereids.rules.analysis.LogicalSubQueryAliasToLogicalProject; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.Count; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctGroupConcat; import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0; import org.apache.doris.nereids.trees.expressions.functions.scalar.If; +import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.util.MemoPatternMatchSupported; import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.ColumnStatisticBuilder; +import org.apache.doris.statistics.Statistics; import org.apache.doris.utframe.TestWithFeService; +import com.google.common.collect.ImmutableMap; import mockit.Mock; import mockit.MockUp; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + public class DistinctAggregateRewriterTest extends TestWithFeService implements MemoPatternMatchSupported { @Override protected void runBeforeAll() throws Exception { @@ -42,6 +54,14 @@ public class DistinctAggregateRewriterTest extends TestWithFeService implements + "distributed by hash(a) properties('replication_num' = '1');"); connectContext.setDatabase("test"); connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + new SessionVariableMockUp(); + } + + private static class SessionVariableMockUp extends MockUp<SessionVariable> { + @Mock + private int getParallelExecInstanceNum(String clusterName) { + return 24; + } } private void applyMock() { @@ -198,5 +218,101 @@ public class DistinctAggregateRewriterTest extends TestWithFeService implements && agg.getAggregateFunctions().stream().noneMatch(AggregateFunction::isDistinct) && agg.getAggregateFunctions().stream().anyMatch(f -> f instanceof MultiDistinctCount) )); + connectContext.getSessionVariable().setAggPhase(0); + } + + @Test + void testShouldUseMultiDistinctWithoutStatsSatisfyDistribution() throws Exception { + DistinctAggregateRewriter rewriter = DistinctAggregateRewriter.INSTANCE; + LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate( + "select bb, count(distinct aa) from " + + "(select a as aa, b as bb from test.distinct_agg_split_t where b > 1) t " + + "group by bb" + ); + Plan child = aggregate.child(); + Map<org.apache.doris.nereids.trees.expressions.Expression, ColumnStatistic> colStats = new HashMap<>(); + aggregate.getGroupByExpressions().forEach(expr -> + colStats.put(expr, unknownColumnStats())); + aggregate.getDistinctArguments().forEach(expr -> + colStats.put(expr, unknownColumnStats())); + ((AbstractPlan) child).setStatistics(new Statistics(10000, colStats)); + aggregate.setStatistics(new Statistics(100, ImmutableMap.of())); + + Assertions.assertFalse(rewriter.shouldUseMultiDistinct(aggregate)); + } + + @Test + void testShouldUseMultiDistinctWithStatsSelected() throws Exception { + DistinctAggregateRewriter rewriter = new DistinctAggregateRewriter(); + LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate( + "select b, count(distinct a) from test.distinct_agg_split_t group by b" + ); + Plan child = aggregate.child(); + Map<org.apache.doris.nereids.trees.expressions.Expression, ColumnStatistic> colStats = new HashMap<>(); + aggregate.getGroupByExpressions().forEach(expr -> + colStats.put(expr, buildColumnStats(240, false))); + aggregate.getDistinctArguments().forEach(expr -> + colStats.put(expr, buildColumnStats(10000.0, false))); + ((AbstractPlan) child).setStatistics(new Statistics(100000, colStats)); + aggregate.setStatistics(new Statistics(240, ImmutableMap.of())); + + Assertions.assertTrue(rewriter.shouldUseMultiDistinct(aggregate)); + } + + @Test + void testShouldUseMultiDistinctWithStatsNotSelected() throws Exception { + DistinctAggregateRewriter rewriter = new DistinctAggregateRewriter(); + LogicalAggregate<? extends Plan> aggregate = getLogicalAggregate( + "select b, count(distinct a) from test.distinct_agg_split_t group by b" + ); + Plan child = aggregate.child(); + Map<org.apache.doris.nereids.trees.expressions.Expression, ColumnStatistic> colStats = new HashMap<>(); + aggregate.getGroupByExpressions().forEach(expr -> + colStats.put(expr, buildColumnStats(1000.0, false))); + aggregate.getDistinctArguments().forEach(expr -> + colStats.put(expr, buildColumnStats(10000.0, false))); + ((AbstractPlan) child).setStatistics(new Statistics(100000, colStats)); + aggregate.setStatistics(new Statistics(1000.0, ImmutableMap.of())); + + Assertions.assertFalse(rewriter.shouldUseMultiDistinct(aggregate)); + } + + private LogicalAggregate<? extends Plan> getLogicalAggregate(String sql) { + Plan plan = PlanChecker.from(connectContext) + .analyze(sql) + .applyTopDown(new LogicalSubQueryAliasToLogicalProject()) + .getPlan(); + Optional<LogicalAggregate<? extends Plan>> aggregate = findAggregate(plan); + Assertions.assertTrue(aggregate.isPresent()); + return aggregate.get(); + } + + private Optional<LogicalAggregate<? extends Plan>> findAggregate(Plan plan) { + if (plan instanceof LogicalAggregate) { + return Optional.of((LogicalAggregate<? extends Plan>) plan); + } + for (Plan child : plan.children()) { + Optional<LogicalAggregate<? extends Plan>> found = findAggregate(child); + if (found.isPresent()) { + return found; + } + } + return Optional.empty(); + } + + private ColumnStatistic unknownColumnStats() { + return buildColumnStats(0.0, true); + } + + private ColumnStatistic buildColumnStats(double ndv, boolean isUnknown) { + return new ColumnStatisticBuilder(1) + .setNdv(ndv) + .setAvgSizeByte(4) + .setNumNulls(0) + .setMinValue(0) + .setMaxValue(100) + .setIsUnknown(isUnknown) + .setUpdatedTime("") + .build(); } } diff --git a/regression-test/data/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.out b/regression-test/data/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.out index 813a08ce40d..ba513d54b02 100644 --- a/regression-test/data/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.out +++ b/regression-test/data/nereids_rules_p0/agg_skew_rewrite/agg_skew_rewrite.out @@ -564,10 +564,12 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------PhysicalOlapScan[test_skew_hint] +------hashAgg[GLOBAL] +--------PhysicalOlapScan[test_skew_hint] -- !shape_not_rewrite -- PhysicalResultSink --hashAgg[GLOBAL] -----PhysicalOlapScan[test_skew_hint] +----hashAgg[GLOBAL] +------PhysicalOlapScan[test_skew_hint] diff --git a/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out b/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out index ba448a756a2..a718649a385 100644 --- a/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out +++ b/regression-test/data/nereids_rules_p0/agg_strategy/agg_strategy.out @@ -60,40 +60,40 @@ 1 -- !agg_distinct_with_gby_key_with_other_func -- -1 1 27 27.0 -1 2 76 76.0 -1 3 42 42.0 -1 4 64 64.0 -1 5 18 18.0 -1 6 91 91.0 -1 7 13 13.0 -1 8 33 33.0 -1 9 55 55.0 -1 10 100 100.0 +1 1 27 27 +1 2 76 76 +1 3 42 42 +1 4 64 64 +1 5 18 18 +1 6 91 91 +1 7 13 13 +1 8 33 33 +1 9 55 55 +1 10 100 100 -- !agg_distinct_satisfy_gby_key_with_other_func -- -1 1 42 42.0 -1 2 18 18.0 -1 3 76 76.0 -1 4 33 33.0 -1 5 91 91.0 -1 6 27 27.0 -1 7 64 64.0 -1 8 55 55.0 -1 9 13 13.0 -1 10 100 100.0 +1 1 42 42 +1 2 18 18 +1 3 76 76 +1 4 33 33 +1 5 91 91 +1 6 27 27 +1 7 64 64 +1 8 55 55 +1 9 13 13 +1 10 100 100 -- !agg_distinct_satisfy_dst_key_with_other_func -- -1 13 13.0 -1 18 18.0 -1 27 27.0 -1 33 33.0 -1 42 42.0 -1 55 55.0 -1 64 64.0 -1 76 76.0 -1 91 91.0 -1 100 100.0 +1 13 13 +1 18 18 +1 27 27 +1 33 33 +1 42 42 +1 55 55 +1 64 64 +1 76 76 +1 91 91 +1 100 100 -- !agg_distinct_without_gby_key -- 10 @@ -153,7 +153,8 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +--------------hashAgg[GLOBAL] +----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_with_gby_key_with_other_func -- PhysicalResultSink @@ -178,10 +179,11 @@ PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[GLOBAL] +--------hashAgg[DISTINCT_GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] -------------hashAgg[LOCAL] ---------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +------------hashAgg[DISTINCT_LOCAL] +--------------hashAgg[GLOBAL] +----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_without_gby_key -- PhysicalResultSink @@ -279,40 +281,40 @@ PhysicalResultSink 1 -- !agg_distinct_with_gby_key_with_other_func -- -1 1 27 27.0 -1 2 76 76.0 -1 3 42 42.0 -1 4 64 64.0 -1 5 18 18.0 -1 6 91 91.0 -1 7 13 13.0 -1 8 33 33.0 -1 9 55 55.0 -1 10 100 100.0 +1 1 27 27 +1 2 76 76 +1 3 42 42 +1 4 64 64 +1 5 18 18 +1 6 91 91 +1 7 13 13 +1 8 33 33 +1 9 55 55 +1 10 100 100 -- !agg_distinct_satisfy_gby_key_with_other_func -- -1 1 42 42.0 -1 2 18 18.0 -1 3 76 76.0 -1 4 33 33.0 -1 5 91 91.0 -1 6 27 27.0 -1 7 64 64.0 -1 8 55 55.0 -1 9 13 13.0 -1 10 100 100.0 +1 1 42 42 +1 2 18 18 +1 3 76 76 +1 4 33 33 +1 5 91 91 +1 6 27 27 +1 7 64 64 +1 8 55 55 +1 9 13 13 +1 10 100 100 -- !agg_distinct_satisfy_dst_key_with_other_func -- -1 13 13.0 -1 18 18.0 -1 27 27.0 -1 33 33.0 -1 42 42.0 -1 55 55.0 -1 64 64.0 -1 76 76.0 -1 91 91.0 -1 100 100.0 +1 13 13 +1 18 18 +1 27 27 +1 33 33 +1 42 42 +1 55 55 +1 64 64 +1 76 76 +1 91 91 +1 100 100 -- !agg_distinct_without_gby_key -- 10 @@ -354,10 +356,7 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalDistribute[DistributionSpecHash] -------------------hashAgg[LOCAL] ---------------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +--------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_satisfy_gby_key -- PhysicalResultSink @@ -365,8 +364,7 @@ PhysicalResultSink ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] --------hashAgg[GLOBAL] -----------hashAgg[GLOBAL] -------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +----------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_satisfy_dst_key -- PhysicalResultSink @@ -376,41 +374,35 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +--------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_with_gby_key_with_other_func -- PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[DISTINCT_GLOBAL] +--------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] -------------hashAgg[DISTINCT_LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalDistribute[DistributionSpecHash] -------------------hashAgg[LOCAL] ---------------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +------------hashAgg[LOCAL] +--------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_satisfy_gby_key_with_other_func -- PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[DISTINCT_GLOBAL] -----------hashAgg[GLOBAL] -------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +--------hashAgg[GLOBAL] +----------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_satisfy_dst_key_with_other_func -- PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[DISTINCT_GLOBAL] +--------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] -------------hashAgg[DISTINCT_LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] +------------hashAgg[LOCAL] +--------------PhysicalOlapScan[t_gbykey_10_dstkey_10_1000_id] -- !agg_distinct_without_gby_key -- PhysicalResultSink @@ -492,16 +484,16 @@ PhysicalResultSink 2 -- !agg_distinct_with_gby_key_with_other_func_low_ndv -- -2 0 60 4.0 -2 1 75 5.0 +2 0 60 4 +2 1 75 5 -- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv -- 2 0 69 4.3125 2 1 66 4.714285714285714 -- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv -- -2 60 4.0 -2 75 5.0 +2 60 4 +2 75 5 -- !agg_distinct_without_gby_key_low_ndv -- 2 @@ -561,7 +553,8 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +--------------hashAgg[GLOBAL] +----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_with_gby_key_with_other_func_low_ndv -- PhysicalResultSink @@ -586,10 +579,11 @@ PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[GLOBAL] +--------hashAgg[DISTINCT_GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] -------------hashAgg[LOCAL] ---------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +------------hashAgg[DISTINCT_LOCAL] +--------------hashAgg[GLOBAL] +----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_without_gby_key_low_ndv -- PhysicalResultSink @@ -647,16 +641,16 @@ PhysicalResultSink 2 -- !agg_distinct_with_gby_key_with_other_func_low_ndv -- -2 0 60 4.0 -2 1 75 5.0 +2 0 60 4 +2 1 75 5 -- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv -- 2 0 69 4.3125 2 1 66 4.714285714285714 -- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv -- -2 60 4.0 -2 75 5.0 +2 60 4 +2 75 5 -- !agg_distinct_without_gby_key_low_ndv -- 2 @@ -698,10 +692,7 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalDistribute[DistributionSpecHash] -------------------hashAgg[LOCAL] ---------------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +--------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_satisfy_gby_key_low_ndv -- PhysicalResultSink @@ -709,8 +700,7 @@ PhysicalResultSink ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] --------hashAgg[GLOBAL] -----------hashAgg[GLOBAL] -------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +----------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_satisfy_dst_key_low_ndv -- PhysicalResultSink @@ -720,41 +710,35 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +--------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_with_gby_key_with_other_func_low_ndv -- PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[DISTINCT_GLOBAL] +--------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] -------------hashAgg[DISTINCT_LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalDistribute[DistributionSpecHash] -------------------hashAgg[LOCAL] ---------------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +------------hashAgg[LOCAL] +--------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_satisfy_gby_key_with_other_func_low_ndv -- PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[DISTINCT_GLOBAL] -----------hashAgg[GLOBAL] -------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +--------hashAgg[GLOBAL] +----------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_satisfy_dst_key_with_other_func_low_ndv -- PhysicalResultSink --PhysicalQuickSort[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalQuickSort[LOCAL_SORT] ---------hashAgg[DISTINCT_GLOBAL] +--------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] -------------hashAgg[DISTINCT_LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] +------------hashAgg[LOCAL] +--------------PhysicalOlapScan[t_gbykey_2_dstkey_10_30_id] -- !agg_distinct_without_gby_key_low_ndv -- PhysicalResultSink @@ -804,16 +788,16 @@ PhysicalResultSink 1 -- !with_gby_split_in_cascades -- -1 13.0 -1 18.0 -1 27.0 -1 33.0 -1 42.0 -1 55.0 -1 64.0 -1 76.0 -1 91.0 -1 100.0 +1 13 +1 18 +1 27 +1 33 +1 42 +1 55 +1 64 +1 76 +1 91 +1 100 -- !without_gby -- 10 diff --git a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out index 18088719ce3..b386b2f510a 100644 --- a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out +++ b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.out @@ -14,10 +14,9 @@ PhysicalResultSink PhysicalResultSink --PhysicalDistribute[DistributionSpecGather] ----hashAgg[GLOBAL] -------hashAgg[GLOBAL] ---------PhysicalDistribute[DistributionSpecHash] -----------hashAgg[LOCAL] -------------PhysicalOlapScan[t1000_2] +------PhysicalDistribute[DistributionSpecHash] +--------hashAgg[LOCAL] +----------PhysicalOlapScan[t1000_2] -- !use_multi_phase3 -- PhysicalResultSink @@ -25,10 +24,7 @@ PhysicalResultSink ----hashAgg[GLOBAL] ------PhysicalDistribute[DistributionSpecHash] --------hashAgg[LOCAL] -----------hashAgg[GLOBAL] -------------PhysicalDistribute[DistributionSpecHash] ---------------hashAgg[LOCAL] -----------------PhysicalOlapScan[t1000_2] +----------PhysicalOlapScan[t1000_2] -- !use_multi_distinct -- PhysicalResultSink diff --git a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out index 05ef347e89b..7c37f7f73b2 100644 --- a/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out +++ b/regression-test/data/nereids_rules_p0/agg_strategy/distinct_agg_strategy_selector.out @@ -57,19 +57,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalDistribute[DistributionSpecHash] -------------------hashAgg[LOCAL] ---------------------PhysicalDistribute[DistributionSpecExecutionAny] -----------------------PhysicalCteConsumer ( cteId=CTEId#0 ) +--------------PhysicalDistribute[DistributionSpecExecutionAny] +----------------PhysicalCteConsumer ( cteId=CTEId#0 ) --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------hashAgg[GLOBAL] -----------------PhysicalDistribute[DistributionSpecHash] -------------------hashAgg[LOCAL] ---------------------PhysicalDistribute[DistributionSpecExecutionAny] -----------------------PhysicalCteConsumer ( cteId=CTEId#0 ) +--------------PhysicalDistribute[DistributionSpecExecutionAny] +----------------PhysicalCteConsumer ( cteId=CTEId#0 ) -- !should_use_multi_distinct_with_group_by -- PhysicalResultSink diff --git a/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out b/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out index a33a83f01e0..8a6d35b0094 100644 --- a/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out +++ b/regression-test/data/nereids_rules_p0/agg_strategy/physical_agg_regulator.out @@ -50,10 +50,7 @@ PhysicalResultSink ----hashAgg[GLOBAL] ------PhysicalDistribute[DistributionSpecHash] --------hashAgg[LOCAL] -----------hashAgg[GLOBAL] -------------PhysicalDistribute[DistributionSpecHash] ---------------hashAgg[LOCAL] -----------------PhysicalOlapScan[t1025] +----------PhysicalOlapScan[t1025] -- !split_multi_agg_use_three_phase -- PhysicalResultSink @@ -67,11 +64,8 @@ PhysicalResultSink -- !split_multi_agg_use_four_phase -- PhysicalResultSink --PhysicalDistribute[DistributionSpecGather] -----hashAgg[DISTINCT_GLOBAL] +----hashAgg[GLOBAL] ------PhysicalDistribute[DistributionSpecHash] ---------hashAgg[DISTINCT_LOCAL] -----------hashAgg[GLOBAL] -------------PhysicalDistribute[DistributionSpecHash] ---------------hashAgg[LOCAL] -----------------PhysicalOlapScan[t1025] +--------hashAgg[LOCAL] +----------PhysicalOlapScan[t1025] diff --git a/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out b/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out index 735f924821a..1975968f4de 100644 --- a/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out +++ b/regression-test/data/nereids_rules_p0/distinct_split/disitinct_split.out @@ -200,18 +200,18 @@ 2 2 -- !00_avg -- -2.0 +2 -- !10_avg -- -2.0 1.5 +2 1.5 -- !01_avg -- -2.0 -2.0 +2 +2 -- !11_avg -- -2.0 1.0 -2.0 2.0 +2 1 +2 2 -- !count_sum_avg_no_gby -- 2 2 3.5 @@ -221,10 +221,10 @@ -- !count_sum_avg_with_gby -- 2 1 3.5 -2 1 4.0 +2 1 4 -- !count_multi_sum_avg_with_gby -- -2 2 4.0 +2 2 4 2 3 3.5 -- !multi_sum_has_upper -- @@ -494,14 +494,10 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) ----hashJoin[INNER_JOIN] hashCondition=((c <=> .c)) otherCondition=() ------hashAgg[GLOBAL] --------hashAgg[LOCAL] -----------hashAgg[GLOBAL] -------------hashAgg[LOCAL] ---------------PhysicalCteConsumer ( cteId=CTEId#0 ) +----------PhysicalCteConsumer ( cteId=CTEId#0 ) ------hashAgg[GLOBAL] --------hashAgg[LOCAL] -----------hashAgg[GLOBAL] -------------hashAgg[LOCAL] ---------------PhysicalCteConsumer ( cteId=CTEId#0 ) +----------PhysicalCteConsumer ( cteId=CTEId#0 ) -- !multi_sum_with_gby -- PhysicalCteAnchor ( cteId=CTEId#0 ) @@ -511,14 +507,10 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) ----hashJoin[INNER_JOIN] hashCondition=((c <=> .c)) otherCondition=() ------hashAgg[GLOBAL] --------hashAgg[LOCAL] -----------hashAgg[GLOBAL] -------------hashAgg[LOCAL] ---------------PhysicalCteConsumer ( cteId=CTEId#0 ) +----------PhysicalCteConsumer ( cteId=CTEId#0 ) ------hashAgg[GLOBAL] --------hashAgg[LOCAL] -----------hashAgg[GLOBAL] -------------hashAgg[LOCAL] ---------------PhysicalCteConsumer ( cteId=CTEId#0 ) +----------PhysicalCteConsumer ( cteId=CTEId#0 ) -- !sum_count_with_gby -- PhysicalCteAnchor ( cteId=CTEId#0 ) @@ -528,13 +520,10 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) ----hashJoin[INNER_JOIN] hashCondition=((a <=> .a)) otherCondition=() ------hashAgg[GLOBAL] --------hashAgg[LOCAL] -----------hashAgg[GLOBAL] -------------hashAgg[LOCAL] ---------------PhysicalCteConsumer ( cteId=CTEId#0 ) +----------PhysicalCteConsumer ( cteId=CTEId#0 ) ------hashAgg[GLOBAL] ---------hashAgg[GLOBAL] -----------hashAgg[LOCAL] -------------PhysicalCteConsumer ( cteId=CTEId#0 ) +--------hashAgg[LOCAL] +----------PhysicalCteConsumer ( cteId=CTEId#0 ) -- !has_grouping -- PhysicalResultSink @@ -544,7 +533,7 @@ PhysicalResultSink --------PhysicalOlapScan[test_distinct_multi] -- !null_hash -- -1 \N 0 0.0 +1 \N 0 0 -- !same_distinct_arg -- 2 1 diff --git a/regression-test/data/shape_check/clickbench/query10.out b/regression-test/data/shape_check/clickbench/query10.out index c7840564369..ae9174ce1c1 100644 --- a/regression-test/data/shape_check/clickbench/query10.out +++ b/regression-test/data/shape_check/clickbench/query10.out @@ -4,9 +4,10 @@ PhysicalResultSink --PhysicalTopN[MERGE_SORT] ----PhysicalDistribute[DistributionSpecGather] ------PhysicalTopN[LOCAL_SORT] ---------hashAgg[GLOBAL] +--------hashAgg[DISTINCT_GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] -------------hashAgg[LOCAL] ---------------PhysicalProject -----------------PhysicalOlapScan[hits] +------------hashAgg[DISTINCT_LOCAL] +--------------hashAgg[GLOBAL] +----------------PhysicalProject +------------------PhysicalOlapScan[hits] diff --git a/regression-test/data/shape_check/clickbench/query11.out b/regression-test/data/shape_check/clickbench/query11.out index 4b5e4486d3f..856e55187f9 100644 --- a/regression-test/data/shape_check/clickbench/query11.out +++ b/regression-test/data/shape_check/clickbench/query11.out @@ -7,7 +7,8 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------PhysicalProject -----------------filter(( not (MobilePhoneModel = ''))) -------------------PhysicalOlapScan[hits] +--------------hashAgg[GLOBAL] +----------------PhysicalProject +------------------filter(( not (MobilePhoneModel = ''))) +--------------------PhysicalOlapScan[hits] diff --git a/regression-test/data/shape_check/clickbench/query12.out b/regression-test/data/shape_check/clickbench/query12.out index 10928363a83..d47a7e129e3 100644 --- a/regression-test/data/shape_check/clickbench/query12.out +++ b/regression-test/data/shape_check/clickbench/query12.out @@ -7,7 +7,8 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------PhysicalProject -----------------filter(( not (MobilePhoneModel = ''))) -------------------PhysicalOlapScan[hits] +--------------hashAgg[GLOBAL] +----------------PhysicalProject +------------------filter(( not (MobilePhoneModel = ''))) +--------------------PhysicalOlapScan[hits] diff --git a/regression-test/data/shape_check/clickbench/query14.out b/regression-test/data/shape_check/clickbench/query14.out index 35eedce41b9..54afcc6268c 100644 --- a/regression-test/data/shape_check/clickbench/query14.out +++ b/regression-test/data/shape_check/clickbench/query14.out @@ -7,7 +7,8 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------PhysicalProject -----------------filter(( not (SearchPhrase = ''))) -------------------PhysicalOlapScan[hits] +--------------hashAgg[GLOBAL] +----------------PhysicalProject +------------------filter(( not (SearchPhrase = ''))) +--------------------PhysicalOlapScan[hits] diff --git a/regression-test/data/shape_check/clickbench/query23.out b/regression-test/data/shape_check/clickbench/query23.out index 76a91b3ad49..5c6ed877934 100644 --- a/regression-test/data/shape_check/clickbench/query23.out +++ b/regression-test/data/shape_check/clickbench/query23.out @@ -7,7 +7,8 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------PhysicalProject -----------------filter(( not (SearchPhrase = '')) and ( not (URL like '%.google.%')) and (Title like '%Google%')) -------------------PhysicalOlapScan[hits] +--------------hashAgg[GLOBAL] +----------------PhysicalProject +------------------filter(( not (SearchPhrase = '')) and ( not (URL like '%.google.%')) and (Title like '%Google%')) +--------------------PhysicalOlapScan[hits] diff --git a/regression-test/data/shape_check/clickbench/query9.out b/regression-test/data/shape_check/clickbench/query9.out index dcece9f0ce7..b35cb2e2a80 100644 --- a/regression-test/data/shape_check/clickbench/query9.out +++ b/regression-test/data/shape_check/clickbench/query9.out @@ -7,6 +7,7 @@ PhysicalResultSink --------hashAgg[GLOBAL] ----------PhysicalDistribute[DistributionSpecHash] ------------hashAgg[LOCAL] ---------------PhysicalProject -----------------PhysicalOlapScan[hits] +--------------hashAgg[GLOBAL] +----------------PhysicalProject +------------------PhysicalOlapScan[hits] diff --git a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy index 72ee1b92efb..a5c136d074c 100644 --- a/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy +++ b/regression-test/suites/nereids_rules_p0/agg_strategy/agg_strategy.groovy @@ -21,6 +21,7 @@ suite("agg_strategy") { sql "set global enable_auto_analyze=false" sql "set runtime_filter_mode=OFF" sql "set be_number_for_test=1;" + sql "set parallel_pipeline_task_num=1;" for (int i = 0; i < 2; i++) { if (i == 0) { diff --git a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy index 953c233c0d5..476fa559ba6 100644 --- a/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy +++ b/regression-test/suites/nereids_rules_p0/agg_strategy/distinct_agg_rewriter.groovy @@ -21,6 +21,7 @@ suite("distinct_agg_rewriter") { set runtime_filter_mode=OFF; set enable_parallel_result_sink=false; set be_number_for_test=1; + set parallel_pipeline_task_num=1; """ multi_sql """ analyze table t1000_2 with sync; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
