This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch aggregates-rules in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d1831fd91087cb8d586378759ce35864f714d98d Author: amashenkov <[email protected]> AuthorDate: Tue Apr 9 22:28:42 2024 +0300 Add sorted aggregate push down rule --- .../internal/sql/engine/prepare/PlannerPhase.java | 5 +- .../rel/agg/IgniteColocatedAggregateBase.java | 10 + .../sql/engine/rel/agg/MapReduceAggregates.java | 262 +++++++++++ .../rule/SortAggregateExchangeTransposeRule.java | 212 +++++++++ .../ignite/internal/sql/engine/util/HintUtils.java | 4 +- .../sql/engine/planner/AggregatePlannerTest.java | 31 +- .../planner/ColocatedHashAggregatePlannerTest.java | 1 + .../planner/ColocatedSortAggregatePlannerTest.java | 1 + .../planner/MapReduceHashAggregatePlannerTest.java | 1 + .../planner/MapReduceSortAggregatePlannerTest.java | 492 +++++++++------------ 10 files changed, 718 insertions(+), 301 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java index 71229e08f0..e5092dd2b1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java @@ -50,12 +50,14 @@ import org.apache.ignite.internal.sql.engine.rule.FilterConverterRule; import org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToHashIndexSpoolRule; import org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToSortedIndexSpoolRule; import org.apache.ignite.internal.sql.engine.rule.HashAggregateConverterRule; +import org.apache.ignite.internal.sql.engine.rule.HashAggregateExchangeTransposeRule; import org.apache.ignite.internal.sql.engine.rule.LogicalScanConverterRule; import org.apache.ignite.internal.sql.engine.rule.MergeJoinConverterRule; import org.apache.ignite.internal.sql.engine.rule.NestedLoopJoinConverterRule; import org.apache.ignite.internal.sql.engine.rule.ProjectConverterRule; import org.apache.ignite.internal.sql.engine.rule.SetOpConverterRule; import org.apache.ignite.internal.sql.engine.rule.SortAggregateConverterRule; +import org.apache.ignite.internal.sql.engine.rule.SortAggregateExchangeTransposeRule; import org.apache.ignite.internal.sql.engine.rule.SortConverterRule; import org.apache.ignite.internal.sql.engine.rule.SortExchangeTransposeRule; import org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule; @@ -235,7 +237,8 @@ public enum PlannerPhase { HashAggregateConverterRule.COLOCATED, HashAggregateConverterRule.MAP_REDUCE, SortAggregateConverterRule.COLOCATED, - SortAggregateConverterRule.MAP_REDUCE, + SortAggregateExchangeTransposeRule.INSTANCE, + // SortAggregateConverterRule.MAP_REDUCE, SetOpConverterRule.COLOCATED_MINUS, SetOpConverterRule.MAP_REDUCE_MINUS, SetOpConverterRule.COLOCATED_INTERSECT, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java index 565193b70b..e1f42e90da 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java @@ -84,6 +84,16 @@ public abstract class IgniteColocatedAggregateBase extends IgniteAggregate imple return List.of(Pair.of(nodeTraits.replace(inDistribution), inputTraits)); } + // Single group must have distribution of single. + if (groupSet.isEmpty()) { + return List.of( + Pair.of( + nodeTraits.replace(IgniteDistributions.single()), + List.of(inputTraits.get(0).replace(IgniteDistributions.single())) + ) + ); + } + // Otherwise if stream is distributed by hash function, and grouping columns is a super-set of // distribution keys, we can use colocated aggregate as well if (inDistribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java index ec8c2819e4..1e7efc995f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java @@ -47,9 +47,11 @@ import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.mapping.Mapping; import org.apache.calcite.util.mapping.Mappings; +import org.apache.ignite.internal.sql.engine.rel.IgniteExchange; import org.apache.ignite.internal.sql.engine.rel.IgniteProject; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; import org.jetbrains.annotations.TestOnly; @@ -344,6 +346,265 @@ public class MapReduceAggregates { return new IgniteProject(agg.getCluster(), reduce.getTraitSet(), reduce, projectionList, agg.getRowType()); } + /** + * Converts the given colocated sorted aggregate over exchange operator into a MAP/REDUCE sorted aggregate implementation. + * + * @param agg Colocated sorted aggregate. + * @param exchange Exchange operator. + * @param builder Builder to create implementations of MAP and REDUCE phases. + * @param fieldMappingOnReduce Mapping to be applied to group sets on REDUCE phase. + * + * @return A physical node tree that implements the given logical operator. + */ + public static IgniteRel buildAggregates( + IgniteColocatedAggregateBase agg, + IgniteExchange exchange, + AggregateRelBuilder builder, + Mapping fieldMappingOnReduce + ) { + + // + // To implement MAP/REDUCE aggregate LogicalAggregate is transformed into + // a map aggregate node, a reduce aggregate node, and an optional project node + // (since some aggregate can be split into multiple ones, or require some additional work after REDUCE phase, + // to combine the results). + // + // SELECT c1, MIN(c2), COUNT(c3) FROM test GROUP BY c1, c2 + // + // MAP [c1, c2, map_agg1, map_agg2] + // REDUCE [c1, c2, reduce_agg1, reduce_agg2] + // PROJECT: [c1, c2, expr_agg1, expr_agg2] + // + // => + // + // {map: map_agg1, reduce: reduce_agg1, expr: expr_agg1, ..} + // {map: map_agg2, reduce: reduce_agg2, expr: expr_agg2, ..} + // + + // Create a list of descriptors for map/reduce version of the given arguments. + // This list is later used to create MAP/REDUCE version of each aggregate. + + List<MapReduceAgg> mapReduceAggs = new ArrayList<>(agg.getAggCallList().size()); + // groupSet includes all columns from GROUP BY/GROUPING SETS clauses. + int argumentOffset = agg.getGroupSet().cardinality(); + + // MAP PHASE AGGREGATE + + List<AggregateCall> mapAggCalls = new ArrayList<>(agg.getAggCallList().size()); + + RelNode input = exchange.getInput(); + for (AggregateCall call : agg.getAggCallList()) { + // See ReturnTypes::AVG_AGG_FUNCTION, Result type of a aggregate with no grouping or with filtering can be nullable. + boolean canBeNull = agg.getGroupCount() == 0 || call.hasFilter(); + + MapReduceAgg mapReduceAgg = createMapReduceAggCall( + Commons.cluster(), + call, + argumentOffset, + input.getRowType(), + canBeNull + ); + argumentOffset += mapReduceAgg.reduceCalls.size(); + mapReduceAggs.add(mapReduceAgg); + + mapAggCalls.addAll(mapReduceAgg.mapCalls); + } + + // MAP phase should have no less than the number of arguments as original aggregate. + // Otherwise there is a bug, because some aggregates were ignored. + assert mapAggCalls.size() >= agg.getAggCallList().size() : + format("The number of MAP aggregates is not correct. Original: {}\nMAP: {}", agg.getAggCallList(), mapAggCalls); + + RelNode map = builder.makeMapAgg( + agg.getCluster(), + input, + agg.getGroupSet(), + agg.getGroupSets(), + mapAggCalls + ); + + // + // REDUCE INPUT PROJECTION + // + + RelDataTypeFactory.Builder reduceType = new Builder(Commons.typeFactory()); + + int groupByColumns = agg.getGroupSet().cardinality(); + boolean sameAggsForBothPhases = true; + + // Build row type for input of REDUCE phase. + // It consists of columns from agg.groupSet and aggregate expressions. + + for (int i = 0; i < groupByColumns; i++) { + List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList(); + RelDataType type = outputRowFields.get(i).getType(); + reduceType.add("f" + reduceType.getFieldCount(), type); + } + + RexBuilder rexBuilder = agg.getCluster().getRexBuilder(); + IgniteTypeFactory typeFactory = (IgniteTypeFactory) agg.getCluster().getTypeFactory(); + + List<RexNode> reduceInputExprs = new ArrayList<>(); + + for (int i = 0; i < map.getRowType().getFieldList().size(); i++) { + RelDataType type = map.getRowType().getFieldList().get(i).getType(); + RexInputRef ref = new RexInputRef(i, type); + reduceInputExprs.add(ref); + } + + // Build a list of projections for reduce operator, + // if all projections are identity, it is not necessary + // to create a projection between MAP and REDUCE operators. + + boolean additionalProjectionsForReduce = false; + + for (int i = 0, argOffset = 0; i < mapReduceAggs.size(); i++) { + MapReduceAgg mapReduceAgg = mapReduceAggs.get(i); + int argIdx = groupByColumns + argOffset; + + for (int j = 0; j < mapReduceAgg.reduceCalls.size(); j++) { + RexNode projExpr = mapReduceAgg.makeReduceInputExpr.makeExpr(rexBuilder, map, List.of(argIdx), typeFactory); + reduceInputExprs.set(argIdx, projExpr); + + if (mapReduceAgg.makeReduceInputExpr != USE_INPUT_FIELD) { + additionalProjectionsForReduce = true; + } + + argIdx += 1; + } + + argOffset += mapReduceAgg.reduceCalls.size(); + } + + RelNode reduceInputNode; + if (additionalProjectionsForReduce) { + RelDataTypeFactory.Builder projectRow = new Builder(agg.getCluster().getTypeFactory()); + + for (int i = 0; i < reduceInputExprs.size(); i++) { + RexNode rexNode = reduceInputExprs.get(i); + projectRow.add(String.valueOf(i), rexNode.getType()); + } + + RelDataType projectRowType = projectRow.build(); + + reduceInputNode = builder.makeProject(agg.getCluster(), map, reduceInputExprs, projectRowType); + } else { + reduceInputNode = map; + } + + // ADD EXCHANGE + reduceInputNode = new IgniteExchange( + agg.getCluster(), + map.getTraitSet().replace(IgniteDistributions.single()), + reduceInputNode, + IgniteDistributions.single() + ); + + // + // REDUCE PHASE AGGREGATE + // + // Build a list of aggregate calls for REDUCE phase. + // Build a list of projections (arg-list, expr) that accept reduce phase and combine/collect/cast results. + + List<AggregateCall> reduceAggCalls = new ArrayList<>(); + List<Map.Entry<List<Integer>, MakeReduceExpr>> projection = new ArrayList<>(mapReduceAggs.size()); + + for (MapReduceAgg mapReduceAgg : mapReduceAggs) { + // Update row type returned by REDUCE node. + int i = 0; + for (AggregateCall reduceCall : mapReduceAgg.reduceCalls) { + reduceType.add("f" + i + "_" + reduceType.getFieldCount(), reduceCall.getType()); + reduceAggCalls.add(reduceCall); + i += 1; + } + + // Update projection list + List<Integer> reduceArgList = mapReduceAgg.argList; + MakeReduceExpr projectionExpr = mapReduceAgg.makeReduceOutputExpr; + projection.add(new SimpleEntry<>(reduceArgList, projectionExpr)); + + if (projectionExpr != USE_INPUT_FIELD) { + sameAggsForBothPhases = false; + } + } + + RelDataType reduceTypeToUse; + if (sameAggsForBothPhases) { + reduceTypeToUse = agg.getRowType(); + } else { + reduceTypeToUse = reduceType.build(); + } + + // if the number of aggregates on MAP phase is larger then the number of aggregates on REDUCE phase, + // assume that some of MAP aggregates are not used by REDUCE phase and this is a bug. + // + // NOTE: In general case REDUCE phase can use more aggregates than MAP phase, + // but at the moment there is no support for such aggregates. + assert mapAggCalls.size() <= reduceAggCalls.size() : + format("The number of MAP/REDUCE aggregates is not correct. MAP: {}\nREDUCE: {}", mapAggCalls, reduceAggCalls); + + // Apply mapping to groupSet/groupSets on REDUCE phase. + ImmutableBitSet groupSetOnReduce = Mappings.apply(fieldMappingOnReduce, agg.getGroupSet()); + List<ImmutableBitSet> groupSetsOnReduce = agg.getGroupSets().stream() + .map(g -> Mappings.apply(fieldMappingOnReduce, g)) + .collect(Collectors.toList()); + + IgniteRel reduce = builder.makeReduceAgg( + agg.getCluster(), + reduceInputNode, + groupSetOnReduce, + groupSetsOnReduce, + reduceAggCalls, + reduceTypeToUse + ); + + // + // FINAL PROJECTION + // + // if aggregate MAP phase uses the same aggregates as REDUCE phase, + // there is no need to add a projection because no additional actions are required to compute final results. + if (sameAggsForBothPhases) { + return reduce; + } + + List<RexNode> projectionList = new ArrayList<>(projection.size() + groupByColumns); + + // Projection list returned by AggregateNode consists of columns from GROUP BY clause + // and expressions that represent aggregate calls. + // In case of MAP/REDUCE those expressions should compute final results for each MAP/REDUCE aggregate. + + int i = 0; + for (; i < groupByColumns; i++) { + List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList(); + RelDataType type = outputRowFields.get(i).getType(); + RexInputRef ref = new RexInputRef(i, type); + projectionList.add(ref); + } + + for (Map.Entry<List<Integer>, MakeReduceExpr> expr : projection) { + RexNode resultExpr = expr.getValue().makeExpr(rexBuilder, reduce, expr.getKey(), typeFactory); + projectionList.add(resultExpr); + } + + assert projectionList.size() == agg.getRowType().getFieldList().size() : + format("Projection size does not match. Expected: {} but got {}", + agg.getRowType().getFieldList().size(), projectionList.size()); + + for (i = 0; i < projectionList.size(); i++) { + RexNode resultExpr = projectionList.get(i); + List<RelDataTypeField> outputRowFields = agg.getRowType().getFieldList(); + + // Put assertion here so we can see an expression that caused a type mismatch, + // since Project::isValid only shows types. + assert resultExpr.getType().equals(outputRowFields.get(i).getType()) : + format("Type at position#{} does not match. Expected: {} but got {}.\nREDUCE aggregates: {}\nRow: {}.\nExpr: {}", + i, resultExpr.getType(), outputRowFields.get(i).getType(), reduceAggCalls, outputRowFields, resultExpr); + + } + + return new IgniteProject(agg.getCluster(), reduce.getTraitSet(), reduce, projectionList, agg.getRowType()); + } + /** * Creates a MAP/REDUCE details for this call. */ @@ -671,4 +932,5 @@ public class MapReduceAggregates { reduceOutputExpr ); } + } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateExchangeTransposeRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateExchangeTransposeRule.java new file mode 100644 index 0000000000..f9ee9ef9e3 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateExchangeTransposeRule.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.rule; + +import static org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.canBeImplementedAsMapReduce; +import static org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single; +import static org.apache.ignite.internal.sql.engine.trait.TraitUtils.distribution; +import static org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg; +import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; + +import java.util.List; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution.Type; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.ignite.internal.sql.engine.rel.IgniteConvention; +import org.apache.ignite.internal.sql.engine.rel.IgniteExchange; +import org.apache.ignite.internal.sql.engine.rel.IgniteProject; +import org.apache.ignite.internal.sql.engine.rel.IgniteRel; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate; +import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates; +import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; +import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.internal.sql.engine.util.HintUtils; +import org.immutables.value.Value; + +/** + * A rule that pushes {@link IgniteColocatedSortAggregate} node under {@link IgniteExchange} if possible, otherwise, splits into map-reduce + * phases. + */ [email protected] +public class SortAggregateExchangeTransposeRule extends RelRule<SortAggregateExchangeTransposeRule.Config> { + public static final RelOptRule INSTANCE = SortAggregateExchangeTransposeRule.Config.INSTANCE.toRule(); + + SortAggregateExchangeTransposeRule(SortAggregateExchangeTransposeRule.Config cfg) { + super(cfg); + } + + @Override + public boolean matches(RelOptRuleCall call) { + IgniteColocatedSortAggregate aggregate = call.rel(0); + IgniteExchange exchange = call.rel(1); + + // Conditions already checked by SortAggregateConverterRule + assert !HintUtils.isExpandDistinctAggregate(aggregate); + assert aggregate.getGroupSets().size() == 1; + + return hashAlike(distribution(exchange.getInput())) + && exchange.distribution() == single() + && (canBePushedDown(aggregate, exchange) || canBeConvertedToMapReduce(aggregate)); + } + + /** + * Returns {@code true} if an aggregate groups by distribution columns and therefore, can be pushed down under Exchange as-is, + * {@code false} otherwise. + */ + private static boolean canBePushedDown(IgniteColocatedSortAggregate aggregate, IgniteExchange exchange) { + return distribution(exchange.getInput()).getType() == Type.HASH_DISTRIBUTED + && !nullOrEmpty(aggregate.getGroupSet()) + && ImmutableBitSet.of(distribution(exchange.getInput()).getKeys()).equals(aggregate.getGroupSet()); + } + + /** + * Returns {@code true} if an aggregate can be converted to map-reduce phases, {@code false} otherwise. + */ + private static boolean canBeConvertedToMapReduce(IgniteColocatedSortAggregate aggregate) { + return canBeImplementedAsMapReduce(aggregate.getAggCallList()) + && !complexDistinctAgg(aggregate.getAggCallList()); + } + + private static boolean hashAlike(IgniteDistribution distribution) { + return distribution.getType() == Type.HASH_DISTRIBUTED + || distribution.getType() == Type.RANDOM_DISTRIBUTED; + } + + @Override + public void onMatch(RelOptRuleCall call) { + IgniteColocatedSortAggregate agg = call.rel(0); + IgniteExchange exchange = call.rel(1); + + RelOptCluster cluster = agg.getCluster(); + RelCollation collation = agg.collation(); + + if (canBePushedDown(agg, exchange)) { + assert agg.getGroupSets().size() == 1; + assert !agg.collation().getKeys().isEmpty(); + + IgniteExchange relNode = new IgniteExchange( + cluster, + exchange.getTraitSet().replace(collation), + new IgniteColocatedSortAggregate( + cluster, + agg.getTraitSet() + .replace(collation) + .replace(distribution(exchange.getInput())), + exchange.getInput(), + agg.getGroupSet(), + agg.getGroupSets(), + agg.getAggCallList() + ), + exchange.distribution() + ); + + call.transformTo(relNode); + + return; + } + + // Create mapping to adjust fields on REDUCE phase. + Mapping fieldMappingOnReduce = Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet()); + + // Adjust columns in output collation. + RelCollation outputCollation = collation.apply(fieldMappingOnReduce); + + RelTraitSet inTraits = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation); + RelTraitSet outTraits = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(outputCollation); + + AggregateRelBuilder relBuilder = new AggregateRelBuilder() { + @Override + public IgniteRel makeMapAgg(RelOptCluster cluster, RelNode input, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { + + return new IgniteMapSortAggregate( + cluster, + outTraits.replace(distribution(input)), + convert(input, inTraits.replace(distribution(input))), + groupSet, + groupSets, + aggregateCalls, + collation + ); + } + + @Override + public IgniteRel makeProject(RelOptCluster cluster, RelNode input, List<RexNode> reduceInputExprs, + RelDataType projectRowType) { + return new IgniteProject(agg.getCluster(), + input.getTraitSet(), + input, + reduceInputExprs, + projectRowType + ); + } + + @Override + public IgniteRel makeReduceAgg(RelOptCluster cluster, RelNode input, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls, RelDataType outputType) { + assert distribution(input) == single(); + + return new IgniteReduceSortAggregate( + cluster, + outTraits.replace(single()), + convert(input, outTraits.replace(single())), + groupSet, + groupSets, + aggregateCalls, + outputType, + collation + ); + } + }; + + call.transformTo(MapReduceAggregates.buildAggregates(agg, exchange, relBuilder, fieldMappingOnReduce)); + } + + /** Configuration. */ + @SuppressWarnings({"ClassNameSameAsAncestorName", "InnerClassFieldHidesOuterClassField"}) + @Value.Immutable + public interface Config extends RelRule.Config { + SortAggregateExchangeTransposeRule.Config INSTANCE = ImmutableSortAggregateExchangeTransposeRule.Config.of() + .withDescription("SortAggregateExchangeTransposeRule") + .withOperandSupplier(o0 -> + o0.operand(IgniteColocatedSortAggregate.class) + .oneInput(o1 -> + o1.operand(IgniteExchange.class) + .anyInputs())) + .as(SortAggregateExchangeTransposeRule.Config.class); + + /** {@inheritDoc} */ + @Override + default SortAggregateExchangeTransposeRule toRule() { + return new SortAggregateExchangeTransposeRule(this); + } + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java index 1f5770097b..1d9da87526 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java @@ -25,10 +25,10 @@ import java.util.List; import java.util.StringJoiner; import java.util.stream.Collectors; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.hint.Hintable; import org.apache.calcite.rel.hint.RelHint; -import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.ignite.internal.sql.engine.hint.IgniteHint; /** @@ -44,7 +44,7 @@ public class HintUtils { * * @param rel Logical aggregate to check on expand distinct aggregate hint. */ - public static boolean isExpandDistinctAggregate(LogicalAggregate rel) { + public static boolean isExpandDistinctAggregate(Aggregate rel) { return rel.getHints().stream() .anyMatch(r -> r.hintName.equals(EXPAND_DISTINCT_AGG.name())) && rel.getAggCallList().stream().anyMatch(AggregateCall::isDistinct); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java index ded0e04155..b5511c2068 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java @@ -315,30 +315,19 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest { )) )); - assertPlan(TestCase.CASE_17A, - hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteColocatedHashAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) - )) - )) - )); - assertPlan(TestCase.CASE_17B, - hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteColocatedHashAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) + Predicate<RelNode> aggregateWithLimit = hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) + .and(input(1, isInstanceOf(IgniteColocatedHashAggregate.class) + .and(input(isInstanceOf(IgniteLimit.class) + .and(input(isInstanceOf(IgniteExchange.class) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) )) )) - )); + )) + ); + assertPlan(TestCase.CASE_17A, aggregateWithLimit); + assertPlan(TestCase.CASE_17B, aggregateWithLimit); } /** diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java index fec1474ed0..7f6833443e 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java @@ -49,6 +49,7 @@ public class ColocatedHashAggregatePlannerTest extends AbstractAggregatePlannerT private final String[] disableRules = { "MapReduceHashAggregateConverterRule", + "SortAggregateExchangeTransposeRule", "MapReduceSortAggregateConverterRule", "ColocatedSortAggregateConverterRule" }; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java index 8953888c80..3a92ca2dbf 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java @@ -49,6 +49,7 @@ public class ColocatedSortAggregatePlannerTest extends AbstractAggregatePlannerT private final String[] disableRules = { "MapReduceHashAggregateConverterRule", "MapReduceSortAggregateConverterRule", + "SortAggregateExchangeTransposeRule", "ColocatedHashAggregateConverterRule" }; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java index 644718598f..52d4333dc6 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java @@ -49,6 +49,7 @@ import org.junit.jupiter.api.Test; */ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerTest { private final String[] disableRules = { + "SortAggregateExchangeTransposeRule", "MapReduceSortAggregateConverterRule", "ColocatedHashAggregateConverterRule", "ColocatedSortAggregateConverterRule" diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java index 14d8da0739..76fca16b6c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java @@ -26,15 +26,17 @@ import java.util.function.Predicate; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; import org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin; import org.apache.ignite.internal.sql.engine.rel.IgniteExchange; +import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan; import org.apache.ignite.internal.sql.engine.rel.IgniteLimit; import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin; import org.apache.ignite.internal.sql.engine.rel.IgniteProject; import org.apache.ignite.internal.sql.engine.rel.IgniteSort; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation; @@ -52,7 +54,6 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT private final String[] disableRules = { "MapReduceHashAggregateConverterRule", "ColocatedHashAggregateConverterRule", - "ColocatedSortAggregateConverterRule" }; /** @@ -125,10 +126,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT checkSimpleAggWithGroupByHash(TestCase.CASE_5B); checkSimpleAggWithGroupByHash(TestCase.CASE_6B); - checkSimpleAggWithGroupByHash(TestCase.CASE_5C); - checkSimpleAggWithGroupByHash(TestCase.CASE_6C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_6C); - checkSimpleAggWithGroupByHash(TestCase.CASE_5D); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5D); } /** @@ -150,13 +151,13 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT checkDistinctAggWithGroupByHash(TestCase.CASE_7_2B); checkDistinctAggWithGroupByHash(TestCase.CASE_7_3B); - checkDistinctAggWithGroupByHash(TestCase.CASE_7_1C); - checkDistinctAggWithGroupByHash(TestCase.CASE_7_2C); - checkDistinctAggWithGroupByHash(TestCase.CASE_7_3C); + checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1C); + checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2C); + checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3C); - checkDistinctAggWithGroupByHash(TestCase.CASE_7_1D); - checkDistinctAggWithGroupByHash(TestCase.CASE_7_2D); - checkDistinctAggWithGroupByHash(TestCase.CASE_7_3D); + checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1D); + checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2D); + checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3D); } /** @@ -175,11 +176,11 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT checkSimpleAggWithGroupByHash(TestCase.CASE_8_1B); checkSimpleAggWithGroupByHash(TestCase.CASE_8_2B); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_1C); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_2C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2C); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_1D); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_2D); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1D); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2D); } /** @@ -201,11 +202,11 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10B); checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11B); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9C); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10C); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11C); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9C); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_10C); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_11C); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9D); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9D); } /** @@ -231,8 +232,8 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13A); checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13B); - checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13C); - checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13D); + checkColocatedGroupWithNoAggregateUseIndexHash(TestCase.CASE_13C); + checkColocatedGroupWithNoAggregateUseIndexHash(TestCase.CASE_13D); } /** @@ -263,36 +264,22 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT String[] additionalRulesToDisable = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"}; assertPlan(TestCase.CASE_16, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isIndexScan("TEST", "idx_val0"))) - )) + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(input(isIndexScan("TEST", "idx_val0"))) ), ArrayUtils.concat(disableRules, additionalRulesToDisable) ); - assertPlan(TestCase.CASE_16A, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isIndexScan("TEST", "idx_val0"))) - )) - )) - ), - ArrayUtils.concat(disableRules, additionalRulesToDisable) - ); - assertPlan(TestCase.CASE_16B, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isIndexScan("TEST", "idx_val0"))) - )) + Predicate<RelNode> aggregateOverIndexScan = nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteMapSortAggregate.class) + .and(input(isIndexScan("TEST", "idx_val0"))) )) - ), - ArrayUtils.concat(disableRules, additionalRulesToDisable) + )) ); + assertPlan(TestCase.CASE_16A, aggregateOverIndexScan, ArrayUtils.concat(disableRules, additionalRulesToDisable)); + assertPlan(TestCase.CASE_16B, aggregateOverIndexScan, ArrayUtils.concat(disableRules, additionalRulesToDisable)); } /** @@ -302,12 +289,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT public void emptyCollationPassThroughLimit() throws Exception { assertPlan(TestCase.CASE_17, hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) + .and(input(1, isInstanceOf(IgniteColocatedSortAggregate.class) + .and(input(isInstanceOf(IgniteLimit.class) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) )) )) @@ -315,40 +300,21 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT disableRules ); - assertPlan(TestCase.CASE_17A, - hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) - )) - )) - )) - ), - disableRules - ); - assertPlan(TestCase.CASE_17B, - hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) + Predicate<RelNode> aggregateWithLimit = hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) + .and(input(1, isInstanceOf(IgniteColocatedSortAggregate.class) + .and(input(isInstanceOf(IgniteLimit.class) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) )) )) - ), - disableRules + )) ); + + assertPlan(TestCase.CASE_17A, aggregateWithLimit, disableRules); + assertPlan(TestCase.CASE_17B, aggregateWithLimit, disableRules); } /** @@ -400,21 +366,15 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT */ @Test public void expandDistinctAggregates() throws Exception { - Predicate<? extends RelNode> subtreePredicate = isInstanceOf(IgniteReduceSortAggregate.class) + Predicate<? extends RelNode> subtreePredicate = isInstanceOf(IgniteColocatedSortAggregate.class) // Check the second aggregation step contains accumulators. // Plan must not contain distinct accumulators. .and(hasAggregate()) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) // Check the first aggregation step is SELECT DISTINCT (doesn't contain any accumulators) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) .and(not(hasAggregate())) .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - )) - )) )); assertPlan(TestCase.CASE_21, nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class) @@ -422,25 +382,25 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT .and(input(1, subtreePredicate)) ), disableRules); - subtreePredicate = isInstanceOf(IgniteReduceSortAggregate.class) + subtreePredicate = nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) // Check the second aggregation step contains accumulators. // Plan must not contain distinct accumulators. .and(hasAggregate()) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - // Check the first aggregation step is SELECT DISTINCT (doesn't contain any accumulators) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - )) + // Check the first aggregation step is SELECT DISTINCT (doesn't contain any accumulators) + .and(input(isInstanceOf(IgniteReduceSortAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteMapSortAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteIndexScan.class))) )) )) - )); + )) + ); assertPlan(TestCase.CASE_21A, nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class) .and(input(0, subtreePredicate)) @@ -473,10 +433,17 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT )) )); + Predicate<IgniteExchange> colocated = isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(in -> hasAggregates(countMap).test(in.getAggCallList())) + .and(input(isInstanceOf(IgniteSort.class))) + )); + assertPlan(TestCase.CASE_22, nonColocated, disableRules); assertPlan(TestCase.CASE_22A, nonColocated, disableRules); - assertPlan(TestCase.CASE_22B, nonColocated, disableRules); - assertPlan(TestCase.CASE_22C, nonColocated, disableRules); + assertPlan(TestCase.CASE_22B, colocated, disableRules); + assertPlan(TestCase.CASE_22C, colocated, disableRules); } /** @@ -485,6 +452,9 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT */ @Test public void twoPhaseAvgAgg() throws Exception { + Predicate<AggregateCall> avgColocated = (a) -> + Objects.equals(a.getAggregation().getName(), "AVG") && a.getArgList().equals(List.of(1)); + Predicate<AggregateCall> sumMap = (a) -> Objects.equals(a.getAggregation().getName(), "SUM") && a.getArgList().equals(List.of(1)); @@ -499,19 +469,26 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT Predicate<RelNode> nonColocated = hasChildThat(isInstanceOf(IgniteReduceSortAggregate.class) .and(in -> hasAggregates(sumReduce, sum0Reduce).test(in.getAggregateCalls())) - .and(input(isInstanceOf(IgniteProject.class) - .and(input(isInstanceOf(IgniteExchange.class) + .and(input(isInstanceOf(IgniteExchange.class) .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) + .and(input(isInstanceOf(IgniteProject.class) + .and(input(isInstanceOf(IgniteMapSortAggregate.class) .and(in -> hasAggregates(sumMap, countMap).test(in.getAggCallList())) - ) + )) )) - )))); + ))); + + Predicate<RelNode> colocated = nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(in -> hasAggregates(avgColocated).test(in.getAggCallList())) + .and(input(isInstanceOf(IgniteSort.class))) + ))); assertPlan(TestCase.CASE_23, nonColocated, disableRules); assertPlan(TestCase.CASE_23A, nonColocated, disableRules); - assertPlan(TestCase.CASE_23B, nonColocated, disableRules); - assertPlan(TestCase.CASE_23C, nonColocated, disableRules); + assertPlan(TestCase.CASE_23B, colocated, disableRules); + assertPlan(TestCase.CASE_23C, colocated, disableRules); } /** @@ -519,23 +496,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT */ @Test public void countDistinctGroupSetSingle() throws Exception { - Predicate<IgniteReduceSortAggregate> inputAgg = isInstanceOf(IgniteReduceSortAggregate.class) - .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0)) - .and(hasCollation(RelCollations.of(0))) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasCollation(RelCollations.of(1))) - .and(hasGroupSets(Aggregate::getGroupSets, 1)) - )); - - assertPlan(TestCase.CASE_24_1, nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets)) + assertPlan(TestCase.CASE_24_1, nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets)) .and(hasCollation(RelCollations.EMPTY)) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets)) - .and(hasCollation(RelCollations.EMPTY)) - .and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg))) - )) - )), + .and(input(isInstanceOf(IgniteTableScan.class)))), disableRules); } @@ -572,14 +536,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT } private void checkSimpleAggSingle(TestCase testCase) throws Exception { - assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasAggregate()) - .and(hasGroups()) - .and(input(isTableScan("TEST"))) - )) - ), + assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasAggregate()) + .and(hasGroups()) + .and(input(isTableScan("TEST")))), disableRules ); } @@ -602,13 +562,11 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT private void checkSimpleAggWithGroupBySingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasAggregate()) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasAggregate()) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) ), disableRules @@ -633,23 +591,15 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT ); } - private void checkDistinctAggSingle(TestCase testCase) throws Exception { + private void checkSimpleAggWithColocatedGroupByHash(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(not(hasAggregate())) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) )) ), @@ -657,40 +607,53 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT ); } + private void checkDistinctAggSingle(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and((hasDistinctAggregate())) + // Without GROUP BY sort can be omitted. + .and(input(isTableScan("TEST"))) + ), + disableRules + ); + } + private void checkDistinctAggHash(TestCase testCase) throws Exception { - assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(not(hasAggregate())) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) - )) - )) - ))), + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasDistinctAggregate()) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isTableScan("TEST"))) + )) + ), disableRules ); } private void checkDistinctAggWithGroupBySingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasDistinctAggregate()) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) + ))), + disableRules + ); + } + + private void checkDistinctAggWithGroupByHash(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) .and(hasAggregate()) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) + .and(input(isInstanceOf(IgniteReduceSortAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) .and(input(isInstanceOf(IgniteMapSortAggregate.class) .and(not(hasAggregate())) .and(hasGroups()) @@ -700,46 +663,31 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT )) )) )) - ), disableRules ); } - private void checkDistinctAggWithGroupByHash(TestCase testCase) throws Exception { + private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) - )) + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasDistinctAggregate()) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) - )) - - ), + ))), disableRules ); } private void checkAggWithGroupByIndexColumnsSingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasAggregate()) - .and(input(isIndexScan("TEST", "idx_grp0_grp1"))) - )) + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasAggregate()) + .and(input(isIndexScan("TEST", "idx_grp0_grp1"))) ), disableRules ); @@ -760,17 +708,27 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT ); } + private void checkAggWithColocatedGroupByIndexColumnsHash(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteProject.class) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasAggregate()) + .and(input(isIndexScan("TEST", "idx_grp0_grp1"))) + )) + ))), + disableRules + ); + } + private void checkGroupWithNoAggregateSingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) .and(not(hasAggregate())) .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) ), disableRules @@ -799,14 +757,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT private void checkGroupWithNoAggregateUseIndexSingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) .and(not(hasAggregate())) .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isIndexScan("TEST", "idx_grp0_grp1"))) - )) + .and(input(isIndexScan("TEST", "idx_grp0_grp1"))) ), disableRules ); @@ -830,14 +784,26 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT ); } + private void checkColocatedGroupWithNoAggregateUseIndexHash(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isIndexScan("TEST", "idx_grp0_grp1"))) + )) + ), + disableRules + ); + } + private void checkGroupsWithOrderBySubsetOfGroupColumnsSingle(TestCase testCase, RelCollation collation) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(s -> s.collation().equals(collation)) - .and(input(isTableScan("TEST"))) - )) + nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(input(isInstanceOf(IgniteSort.class) + .and(s -> s.collation().equals(collation)) + .and(input(isTableScan("TEST"))) ))), disableRules ); @@ -863,14 +829,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT assertPlan(testCase, isInstanceOf(IgniteSort.class) .and(s -> s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2)))) - .and(input(isInstanceOf(IgniteProject.class) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(s -> s.collation().equals(collation)) - .and(input(isTableScan("TEST") - )) - )) + .and(input(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(input(isInstanceOf(IgniteSort.class) + .and(s -> s.collation().equals(collation)) + .and(input(isTableScan("TEST") )) )) )), @@ -901,41 +863,27 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT } private void checkCountDistinctHash(TestCase testCase) throws Exception { - Predicate<IgniteReduceSortAggregate> inputAgg = isInstanceOf(IgniteReduceSortAggregate.class) - .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0)) - .and(hasCollation(RelCollations.of(0))) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(single())) - .and(hasCollation(RelCollations.of(0))) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasGroupSets(Aggregate::getGroupSets, 1)) - )) - )); - - assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) - .and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets)) + // Aggregate with single group doesn't require an additional sort. + // Map-reduce with additional sort is much costly. + assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets)) .and(hasCollation(RelCollations.EMPTY)) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets)) - .and(hasCollation(RelCollations.EMPTY)) - .and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg))) - )) - )), + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) + .and(input(isInstanceOf(TableScan.class))) + )) + ), disableRules); } private void checkDerivedCollationWithOrderByGroupColumnSingle(TestCase testCase) throws Exception { RelCollation requiredCollation = RelCollations.of(TraitUtils.createFieldCollation(0, Collation.DESC_NULLS_FIRST)); - assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class) + assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class) .and(hasAggregate()) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasAggregate()) - .and(input(isInstanceOf(IgniteSort.class) - .and(hasCollation(requiredCollation)) - )) + .and(input(isInstanceOf(IgniteSort.class) + .and(hasCollation(requiredCollation)) )) - ), disableRules); } @@ -961,17 +909,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST), TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST) ); - RelCollation outputCollation = RelCollations.of( - TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST) - ); - assertPlan(testCase, isInstanceOf(Project.class) - .and(hasCollation(outputCollation)) - .and(input(isInstanceOf(IgniteReduceSortAggregate.class) - .and(input(isInstanceOf(IgniteMapSortAggregate.class) - .and(hasChildThat(hasCollation(requiredCollation))) - )) - )), + assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class) + .and(hasCollation(requiredCollation)) + .and(hasChildThat(hasCollation(requiredCollation))), disableRules); } @@ -980,13 +921,10 @@ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerT TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST), TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST) ); - RelCollation outputCollation = RelCollations.of( - TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST) - ); assertPlan(testCase, isInstanceOf(IgniteProject.class) - .and(hasCollation(outputCollation)) + .and(hasCollation(requiredCollation)) .and(input(isInstanceOf(IgniteReduceSortAggregate.class) .and(input(isInstanceOf(IgniteExchange.class) .and(hasDistribution(single()))
