This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch aggregates-rules in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d386dcd4e7ce0954de01089be4c7481a5fa8c2ce Author: amashenkov <[email protected]> AuthorDate: Mon Apr 15 21:35:04 2024 +0300 Add hash aggregate push down rule --- .../internal/sql/engine/prepare/PlannerPhase.java | 3 +- .../engine/rule/HashAggregateConverterRule.java | 23 +- .../rule/HashAggregateExchangeTransposeRule.java | 201 +++++++++ .../planner/ColocatedHashAggregatePlannerTest.java | 1 + .../planner/ColocatedSortAggregatePlannerTest.java | 1 + .../planner/MapReduceHashAggregatePlannerTest.java | 465 ++++++++++----------- .../planner/MapReduceSortAggregatePlannerTest.java | 1 + 7 files changed, 445 insertions(+), 250 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java index e5092dd2b1..352172a051 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java @@ -235,7 +235,8 @@ public enum PlannerPhase { LogicalScanConverterRule.TABLE_SCAN, LogicalScanConverterRule.SYSTEM_VIEW_SCAN, HashAggregateConverterRule.COLOCATED, - HashAggregateConverterRule.MAP_REDUCE, + // HashAggregateConverterRule.MAP_REDUCE, + HashAggregateExchangeTransposeRule.INSTANCE, SortAggregateConverterRule.COLOCATED, SortAggregateExchangeTransposeRule.INSTANCE, // SortAggregateConverterRule.MAP_REDUCE, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java index 6542b6fc03..549394ce90 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.PhysicalNode; import org.apache.calcite.rel.RelNode; @@ -64,6 +65,13 @@ public class HashAggregateConverterRule { super(LogicalAggregate.class, "ColocatedHashAggregateConverterRule"); } + @Override + public boolean matches(RelOptRuleCall call) { + LogicalAggregate aggregate = call.rel(0); + + return !HintUtils.isExpandDistinctAggregate(aggregate); + } + /** {@inheritDoc} */ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, @@ -93,16 +101,19 @@ public class HashAggregateConverterRule { super(LogicalAggregate.class, "MapReduceHashAggregateConverterRule"); } + @Override + public boolean matches(RelOptRuleCall call) { + LogicalAggregate aggregate = call.rel(0); + + return !HintUtils.isExpandDistinctAggregate(aggregate) + && canBeImplementedAsMapReduce(aggregate.getAggCallList()) + && !complexDistinctAgg(aggregate.getAggCallList()); + } + /** {@inheritDoc} */ @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalAggregate agg) { - if (complexDistinctAgg(agg.getAggCallList()) - || !canBeImplementedAsMapReduce(agg.getAggCallList()) - || HintUtils.isExpandDistinctAggregate(agg)) { - return null; - } - RelOptCluster cluster = agg.getCluster(); RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE); RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java new file mode 100644 index 0000000000..d1da074376 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.rule; + +import static org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.canBeImplementedAsMapReduce; +import static org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single; +import static org.apache.ignite.internal.sql.engine.trait.TraitUtils.distribution; +import static org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg; +import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; + +import java.util.List; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelDistribution.Type; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.ignite.internal.sql.engine.rel.IgniteConvention; +import org.apache.ignite.internal.sql.engine.rel.IgniteExchange; +import org.apache.ignite.internal.sql.engine.rel.IgniteProject; +import org.apache.ignite.internal.sql.engine.rel.IgniteRel; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate; +import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates; +import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; +import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.internal.sql.engine.util.HintUtils; +import org.immutables.value.Value; + +/** + * A rule that pushes {@link IgniteColocatedHashAggregate} node under {@link IgniteExchange} if possible, otherwise, splits into map-reduce + * phases. + */ [email protected] +public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExchangeTransposeRule.Config> { + public static final RelOptRule INSTANCE = HashAggregateExchangeTransposeRule.Config.INSTANCE.toRule(); + + HashAggregateExchangeTransposeRule(HashAggregateExchangeTransposeRule.Config cfg) { + super(cfg); + } + + @Override + public boolean matches(RelOptRuleCall call) { + IgniteColocatedHashAggregate aggregate = call.rel(0); + IgniteExchange exchange = call.rel(1); + + // Conditions already checked by IgniteColocatedHashAggregate + assert !HintUtils.isExpandDistinctAggregate(aggregate); + + return exchange.distribution() == single() + && (canBePushedDown(aggregate, exchange) || canBeConvertedToMapReduce(aggregate)); + } + + /** + * Returns {@code true} if an aggregate groups by distribution columns and therefore, can be pushed down under Exchange as-is, + * {@code false} otherwise. + */ + private static boolean canBePushedDown(IgniteColocatedHashAggregate aggregate, IgniteExchange exchange) { + return distribution(exchange.getInput()).getType() == Type.HASH_DISTRIBUTED + && !nullOrEmpty(aggregate.getGroupSet()) + && ImmutableBitSet.of(distribution(exchange.getInput()).getKeys()).equals(aggregate.getGroupSet()); + } + + /** + * Returns {@code true} if an aggregate can be converted to map-reduce phases, {@code false} otherwise. + */ + private static boolean canBeConvertedToMapReduce(IgniteColocatedHashAggregate aggregate) { + return canBeImplementedAsMapReduce(aggregate.getAggCallList()) + && !complexDistinctAgg(aggregate.getAggCallList()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + IgniteColocatedHashAggregate agg = call.rel(0); + IgniteExchange exchange = call.rel(1); + + RelOptCluster cluster = agg.getCluster(); + + if (canBePushedDown(agg, exchange)) { + assert agg.getGroupSets().size() == 1; + assert agg.collation().getKeys().isEmpty(); + + IgniteExchange relNode = new IgniteExchange( + cluster, + agg.getTraitSet(), + new IgniteColocatedHashAggregate( + cluster, + agg.getTraitSet() + .replace(distribution(exchange.getInput())), + exchange.getInput(), + agg.getGroupSet(), + agg.getGroupSets(), + agg.getAggCallList() + ), + exchange.distribution() + ); + + call.transformTo(relNode); + + return; + } + + // Create mapping to adjust fields on REDUCE phase. + Mapping fieldMappingOnReduce = Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet()); + + RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE); + RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE); + + RelTraitSet reducePhaseTraits = outTrait.replace(IgniteDistributions.single()); + + AggregateRelBuilder relBuilder = new AggregateRelBuilder() { + @Override + public IgniteRel makeMapAgg(RelOptCluster cluster, RelNode input, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { + + return new IgniteMapHashAggregate( + cluster, + outTrait.replace(IgniteDistributions.random()), + input, + groupSet, + groupSets, + aggregateCalls + ); + } + + @Override + public IgniteRel makeProject(RelOptCluster cluster, RelNode input, List<RexNode> reduceInputExprs, + RelDataType projectRowType) { + + return new IgniteProject( + agg.getCluster(), + input.getTraitSet(), + input, + reduceInputExprs, + projectRowType + ); + } + + @Override + public IgniteRel makeReduceAgg(RelOptCluster cluster, RelNode input, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls, RelDataType outputType) { + assert distribution(input) == single(); + + return new IgniteReduceHashAggregate( + cluster, + reducePhaseTraits, + convert(input, inTrait.replace(single())), + groupSet, + groupSets, + aggregateCalls, + outputType + ); + } + }; + + call.transformTo(MapReduceAggregates.buildAggregates(agg, exchange, relBuilder, fieldMappingOnReduce)); + } + + /** Configuration. */ + @SuppressWarnings({"ClassNameSameAsAncestorName", "InnerClassFieldHidesOuterClassField"}) + @Value.Immutable + public interface Config extends RelRule.Config { + HashAggregateExchangeTransposeRule.Config INSTANCE = ImmutableHashAggregateExchangeTransposeRule.Config.of() + .withDescription("HashAggregateExchangeTransposeRule") + .withOperandSupplier(o0 -> + o0.operand(IgniteColocatedHashAggregate.class) + .oneInput(o1 -> + o1.operand(IgniteExchange.class) + .anyInputs())) + .as(HashAggregateExchangeTransposeRule.Config.class); + + /** {@inheritDoc} */ + @Override + default HashAggregateExchangeTransposeRule toRule() { + return new HashAggregateExchangeTransposeRule(this); + } + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java index 7f6833443e..d5c184bbbf 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java @@ -48,6 +48,7 @@ import org.junit.jupiter.api.Test; public class ColocatedHashAggregatePlannerTest extends AbstractAggregatePlannerTest { private final String[] disableRules = { + "HashAggregateExchangeTransposeRule", "MapReduceHashAggregateConverterRule", "SortAggregateExchangeTransposeRule", "MapReduceSortAggregateConverterRule", diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java index 3a92ca2dbf..38219bda43 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test; */ public class ColocatedSortAggregatePlannerTest extends AbstractAggregatePlannerTest { private final String[] disableRules = { + "HashAggregateExchangeTransposeRule", "MapReduceHashAggregateConverterRule", "MapReduceSortAggregateConverterRule", "SortAggregateExchangeTransposeRule", diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java index 52d4333dc6..ba05af4de4 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java @@ -33,6 +33,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteLimit; import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin; import org.apache.ignite.internal.sql.engine.rel.IgniteProject; import org.apache.ignite.internal.sql.engine.rel.IgniteSort; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; +import org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation; @@ -51,7 +53,6 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT private final String[] disableRules = { "SortAggregateExchangeTransposeRule", "MapReduceSortAggregateConverterRule", - "ColocatedHashAggregateConverterRule", "ColocatedSortAggregateConverterRule" }; @@ -81,11 +82,11 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT checkDistinctAggHash(TestCase.CASE_2_1B); checkDistinctAggHash(TestCase.CASE_2_2B); - checkDistinctAggHash(TestCase.CASE_2_1C); - checkDistinctAggHash(TestCase.CASE_2_2C); + checkColocatedDistinctAggHash(TestCase.CASE_2_1C); + checkColocatedDistinctAggHash(TestCase.CASE_2_2C); - checkDistinctAggHash(TestCase.CASE_2_1D); - checkDistinctAggHash(TestCase.CASE_2_2D); + checkColocatedDistinctAggHash(TestCase.CASE_2_1D); + checkColocatedDistinctAggHash(TestCase.CASE_2_2D); } /** @@ -125,10 +126,10 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT checkSimpleAggWithGroupByHash(TestCase.CASE_5B); checkSimpleAggWithGroupByHash(TestCase.CASE_6B); - checkSimpleAggWithGroupByHash(TestCase.CASE_5C); - checkSimpleAggWithGroupByHash(TestCase.CASE_6C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_6C); - checkSimpleAggWithGroupByHash(TestCase.CASE_5D); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5D); } /** @@ -175,11 +176,11 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT checkSimpleAggWithGroupByHash(TestCase.CASE_8_1B); checkSimpleAggWithGroupByHash(TestCase.CASE_8_2B); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_1C); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_2C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1C); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2C); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_1D); - checkSimpleAggWithGroupByHash(TestCase.CASE_8_2D); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1D); + checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2D); } /** @@ -201,11 +202,11 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10B); checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11B); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9C); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10C); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11C); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9C); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_10C); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_11C); - checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9D); + checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9D); } /** @@ -217,8 +218,9 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT checkGroupWithNoAggregateHash(TestCase.CASE_12A); checkGroupWithNoAggregateHash(TestCase.CASE_12B); - checkGroupWithNoAggregateHash(TestCase.CASE_12C); - checkGroupWithNoAggregateHash(TestCase.CASE_12D); + + checkColocatedGroupWithNoAggregateHash(TestCase.CASE_12C); + checkColocatedGroupWithNoAggregateHash(TestCase.CASE_12D); } /** @@ -230,8 +232,9 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT checkGroupWithNoAggregateHash(TestCase.CASE_13A); checkGroupWithNoAggregateHash(TestCase.CASE_13B); - checkGroupWithNoAggregateHash(TestCase.CASE_13C); - checkGroupWithNoAggregateHash(TestCase.CASE_13D); + + checkColocatedGroupWithNoAggregateHash(TestCase.CASE_13C); + checkColocatedGroupWithNoAggregateHash(TestCase.CASE_13D); } /** @@ -263,39 +266,24 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT assertPlan(TestCase.CASE_16, nodeOrAnyChild(isInstanceOf(IgniteSort.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isTableScan("TEST"))) - )) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(input(isTableScan("TEST"))) )) ), ArrayUtils.concat(disableRules, additionalRulesToDisable) ); - assertPlan(TestCase.CASE_16A, - nodeOrAnyChild(isInstanceOf(IgniteSort.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isTableScan("TEST"))) - )) - )) - )) - ), - ArrayUtils.concat(disableRules, additionalRulesToDisable) - ); - assertPlan(TestCase.CASE_16B, - nodeOrAnyChild(isInstanceOf(IgniteSort.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isTableScan("TEST"))) - )) + Predicate<RelNode> aggregateOverIndexScan = nodeOrAnyChild(isInstanceOf(IgniteSort.class) + .and(input(isInstanceOf(IgniteReduceHashAggregate.class) + .and(input(isInstanceOf(IgniteExchange.class) + .and(input(isInstanceOf(IgniteMapHashAggregate.class) + .and(input(isTableScan("TEST"))) )) )) - ), - ArrayUtils.concat(disableRules, additionalRulesToDisable) + )) ); + assertPlan(TestCase.CASE_16A, aggregateOverIndexScan, ArrayUtils.concat(disableRules, additionalRulesToDisable)); + assertPlan(TestCase.CASE_16B, aggregateOverIndexScan, ArrayUtils.concat(disableRules, additionalRulesToDisable)); } /** @@ -305,12 +293,10 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT public void emptyCollationPassThroughLimit() throws Exception { assertPlan(TestCase.CASE_17, hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) + .and(input(1, isInstanceOf(IgniteColocatedHashAggregate.class) + .and(input(isInstanceOf(IgniteLimit.class) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) )) )) @@ -318,38 +304,19 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT disableRules ); - assertPlan(TestCase.CASE_17A, - hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) - )) - )) - )) - ), - disableRules - ); - assertPlan(TestCase.CASE_17B, - hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(1, isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isInstanceOf(IgniteLimit.class) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteSort.class) - .and(input(isTableScan("TEST"))) - )) - )) + Predicate<RelNode> aggregateWithLimit = hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) + .and(input(1, isInstanceOf(IgniteColocatedHashAggregate.class) + .and(input(isInstanceOf(IgniteLimit.class) + .and(input(isInstanceOf(IgniteExchange.class) + .and(input(isInstanceOf(IgniteSort.class) + .and(input(isTableScan("TEST"))) )) )) )) - ), - disableRules + )) ); + assertPlan(TestCase.CASE_17A, aggregateWithLimit, disableRules); + assertPlan(TestCase.CASE_17B, aggregateWithLimit, disableRules); } /** @@ -393,21 +360,15 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT */ @Test public void expandDistinctAggregates() throws Exception { - Predicate<? extends RelNode> subtreePredicate = nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) + Predicate<? extends RelNode> subtreePredicate = nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) // Check the second aggregation step contains accumulators. // Plan must not contain distinct accumulators. .and(hasAggregate()) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - // Check the first aggregation step is SELECT DISTINCT (doesn't contain any accumulators) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - )) - )) + // Check the first aggregation step is SELECT DISTINCT (doesn't contain any accumulators) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) )) ); @@ -416,21 +377,21 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT .and(input(1, subtreePredicate)) ), disableRules); - subtreePredicate = nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) + subtreePredicate = nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) // Check the second aggregation step contains accumulators. // Plan must not contain distinct accumulators. .and(hasAggregate()) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - // Check the first aggregation step is SELECT DISTINCT (doesn't contain any accumulators) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - )) + // Check the first aggregation step is SELECT DISTINCT (doesn't contain any accumulators) + .and(input(isInstanceOf(IgniteReduceHashAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteMapHashAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteTableScan.class))) )) )) )) @@ -471,10 +432,17 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT )) )); + Predicate<RelNode> colocated = nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(in -> hasAggregates(countMap).test(in.getAggCallList())) + .and(input(isTableScan("TEST"))) + ))); + assertPlan(TestCase.CASE_22, nonColocated, disableRules); assertPlan(TestCase.CASE_22A, nonColocated, disableRules); - assertPlan(TestCase.CASE_22B, nonColocated, disableRules); - assertPlan(TestCase.CASE_22C, nonColocated, disableRules); + assertPlan(TestCase.CASE_22B, colocated, disableRules); + assertPlan(TestCase.CASE_22C, colocated, disableRules); } /** @@ -482,6 +450,9 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT */ @Test public void twoPhaseAvgAgg() throws Exception { + Predicate<AggregateCall> avgColocated = (a) -> + Objects.equals(a.getAggregation().getName(), "AVG") && a.getArgList().equals(List.of(1)); + Predicate<AggregateCall> sumMap = (a) -> Objects.equals(a.getAggregation().getName(), "SUM") && a.getArgList().equals(List.of(1)); @@ -494,21 +465,28 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT Predicate<AggregateCall> sum0Reduce = (a) -> Objects.equals(a.getAggregation().getName(), "$SUM0") && a.getArgList().equals(List.of(2)); - Predicate<RelNode> nonColocated = hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class) + Predicate<RelNode> nonColocated = nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) .and(in -> hasAggregates(sumReduce, sum0Reduce).test(in.getAggregateCalls())) .and(input(isInstanceOf(IgniteExchange.class) .and(hasDistribution(IgniteDistributions.single())) .and(input(isInstanceOf(IgniteProject.class) .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(in -> hasAggregates(sumMap, countMap).test(in.getAggCallList())) - ) + .and(in -> hasAggregates(sumMap, countMap).test(in.getAggCallList())) )) - )))); + )) + ))); + + Predicate<RelNode> colocated = nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(in -> hasAggregates(avgColocated).test(in.getAggCallList())) + .and(input(isTableScan("TEST"))) + ))); assertPlan(TestCase.CASE_23, nonColocated, disableRules); assertPlan(TestCase.CASE_23A, nonColocated, disableRules); - assertPlan(TestCase.CASE_23B, nonColocated, disableRules); - assertPlan(TestCase.CASE_23C, nonColocated, disableRules); + assertPlan(TestCase.CASE_23B, colocated, disableRules); + assertPlan(TestCase.CASE_23C, colocated, disableRules); } /** @@ -516,19 +494,10 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT */ @Test public void countDistinctGroupSetSingle() throws Exception { - Predicate<IgniteReduceHashAggregate> inputAgg = isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0)) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasGroupSets(Aggregate::getGroupSets, 1)) - )); - - assertPlan(TestCase.CASE_24_1, nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets)) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets)) - .and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg))) - )) - )), + assertPlan(TestCase.CASE_24_1, nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets)) + .and(input(isTableScan("TEST"))) + ), disableRules); } @@ -539,9 +508,9 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT public void countDistinctGroupSetHash() throws Exception { checkCountDistinctHash(TestCase.CASE_24_1A); checkCountDistinctHash(TestCase.CASE_24_1B); - checkCountDistinctHash(TestCase.CASE_24_1C); + checkCountDistinctHashColocated(TestCase.CASE_24_1C); checkCountDistinctHash(TestCase.CASE_24_1D); - checkCountDistinctHash(TestCase.CASE_24_1E); + checkCountDistinctHashColocated(TestCase.CASE_24_1E); } /** @@ -566,11 +535,9 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT private void checkSimpleAggSingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasAggregate()) - .and(input(isTableScan("TEST"))) - )) + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasAggregate()) + .and(input(isTableScan("TEST"))) ), disableRules ); @@ -592,12 +559,10 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT private void checkSimpleAggWithGroupBySingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasAggregate()) - .and(hasGroups()) - .and(input(isTableScan("TEST"))) - )) + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasAggregate()) + .and(hasGroups()) + .and(input(isTableScan("TEST"))) ), disableRules ); @@ -618,21 +583,15 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT ); } - private void checkDistinctAggSingle(TestCase testCase) throws Exception { + private void checkSimpleAggWithColocatedGroupByHash(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(not(hasAggregate())) + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteProject.class) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasAggregate()) .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(input(isTableScan("TEST"))) - )) + .and(input(isTableScan("TEST"))) )) )) ), @@ -640,22 +599,28 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT ); } + private void checkDistinctAggSingle(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasDistinctAggregate()) + .and(input(isTableScan("TEST"))) + ), + disableRules + ); + } + private void checkDistinctAggHash(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) .and(hasAggregate()) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(input(isTableScan("TEST"))) - )) + .and(input(isInstanceOf(IgniteReduceHashAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteExchange.class) + .and(input(isInstanceOf(IgniteMapHashAggregate.class) + .and(not(hasAggregate())) + .and(input(isTableScan("TEST"))) )) )) ))), @@ -663,22 +628,27 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT ); } - private void checkDistinctAggWithGroupBySingle(TestCase testCase) throws Exception { + private void checkColocatedDistinctAggHash(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasAggregate()) + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) .and(not(hasAggregate())) .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isTableScan("TEST"))) - )) + .and(input(isTableScan("TEST"))) )) - )) + ))), + disableRules + ); + } + + private void checkDistinctAggWithGroupBySingle(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasDistinctAggregate()) + .and(input(isTableScan("TEST"))) ), disableRules ); @@ -686,19 +656,17 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT private void checkDistinctAggWithGroupByHash(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) .and(hasAggregate()) .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isTableScan("TEST"))) - )) + .and(input(isInstanceOf(IgniteReduceHashAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isInstanceOf(IgniteExchange.class) + .and(input(isInstanceOf(IgniteMapHashAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isTableScan("TEST"))) )) )) )) @@ -709,34 +677,22 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasAggregate()) - .and(not(hasDistinctAggregate())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isTableScan("TEST"))) - )) - )) - )) - )) - ), + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasDistinctAggregate()) + .and(hasGroups()) + .and(input(isTableScan("TEST"))) + ))), disableRules ); } private void checkAggWithGroupByIndexColumnsSingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasAggregate()) - .and(input(isTableScan("TEST"))) - )) + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasAggregate()) + .and(input(isTableScan("TEST"))) ), disableRules ); @@ -756,16 +712,27 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT ); } + private void checkAggWithColocatedGroupByIndexColumnsHash(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteProject.class) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasAggregate()) + .and(input(isTableScan("TEST"))) + )) + )) + ), + disableRules + ); + } + private void checkGroupWithNoAggregateSingle(TestCase testCase) throws Exception { assertPlan(testCase, - nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) + nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) .and(not(hasAggregate())) .and(hasGroups()) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(not(hasAggregate())) - .and(hasGroups()) - .and(input(isTableScan("TEST"))) - )) + .and(input(isTableScan("TEST"))) ), disableRules ); @@ -788,16 +755,27 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT ); } + + private void checkColocatedGroupWithNoAggregateHash(TestCase testCase) throws Exception { + assertPlan(testCase, + nodeOrAnyChild(isInstanceOf(IgniteExchange.class) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(not(hasAggregate())) + .and(hasGroups()) + .and(input(isTableScan("TEST"))) + )) + ), + disableRules + ); + } + private void checkGroupsWithOrderByGroupColumnsSingle(TestCase testCase, RelCollation collation) throws Exception { assertPlan(testCase, isInstanceOf(IgniteSort.class) .and(s -> s.collation().equals(collation)) - .and(input(isInstanceOf(IgniteProject.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(input(isTableScan("TEST"))) - )) - )) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(input(isTableScan("TEST"))) + )), disableRules ); @@ -820,7 +798,6 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT ); } - private void checkCountDistinctHash(TestCase testCase) throws Exception { Predicate<IgniteReduceHashAggregate> inputAgg = isInstanceOf(IgniteReduceHashAggregate.class) .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0)) @@ -831,27 +808,33 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT )) )); - assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets)) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets)) - .and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg))) - )) + assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasNoGroupSets(Aggregate::getGroupSets)) + .and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg))) )), disableRules); } + private void checkCountDistinctHashColocated(TestCase testCase) throws Exception { + assertPlan(testCase, nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasNoGroupSets(Aggregate::getGroupSets)) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class) + .and(hasGroupSets(Aggregate::getGroupSets, 1)) + )) + ))), + disableRules); + } + private void checkDerivedCollationWithOrderByGroupColumnSingle(TestCase testCase) throws Exception { RelCollation requiredCollation = RelCollations.of(TraitUtils.createFieldCollation(0, Collation.DESC_NULLS_FIRST)); assertPlan(testCase, isInstanceOf(IgniteSort.class) - .and(hasCollation(requiredCollation)) - .and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasAggregate()) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) + .and(hasCollation(requiredCollation)) + .and(nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class) .and(hasAggregate()) - )) - )), + )), disableRules); } @@ -859,16 +842,16 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT RelCollation requiredCollation = RelCollations.of(TraitUtils.createFieldCollation(0, Collation.DESC_NULLS_FIRST)); assertPlan(testCase, isInstanceOf(IgniteSort.class) - .and(hasCollation(requiredCollation)) - .and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) - .and(hasAggregate()) - .and(input(isInstanceOf(IgniteExchange.class) - .and(hasDistribution(IgniteDistributions.single())) - .and(input(isInstanceOf(IgniteMapHashAggregate.class) - .and(hasAggregate()) + .and(hasCollation(requiredCollation)) + .and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class) + .and(hasAggregate()) + .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(IgniteDistributions.single())) + .and(input(isInstanceOf(IgniteMapHashAggregate.class) + .and(hasAggregate()) + )) )) - )) - )), + )), disableRules); } @@ -880,11 +863,7 @@ public class MapReduceHashAggregatePlannerTest extends AbstractAggregatePlannerT assertPlan(testCase, isInstanceOf(IgniteSort.class) .and(hasCollation(outputCollation)) - .and(input(isInstanceOf(IgniteProject.class) - .and(input(isInstanceOf(IgniteReduceHashAggregate.class) - .and(input(isInstanceOf(IgniteMapHashAggregate.class))) - )) - )), + .and(input(isInstanceOf(IgniteColocatedHashAggregate.class))), disableRules); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java index 76fca16b6c..e570a0e384 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java @@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test; */ public class MapReduceSortAggregatePlannerTest extends AbstractAggregatePlannerTest { private final String[] disableRules = { + "HashAggregateExchangeTransposeRule", "MapReduceHashAggregateConverterRule", "ColocatedHashAggregateConverterRule", };
