This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch aggregates-rules in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 10d2551874280cace0173655ff89267f1d416a5d Author: amashenkov <[email protected]> AuthorDate: Tue Apr 16 01:53:07 2024 +0300 fixup! Add hash aggregate push down rule --- .../internal/sql/engine/prepare/PlannerPhase.java | 2 +- .../engine/rule/HashAggregateConverterRule.java | 9 ++---- .../rule/HashAggregateExchangeTransposeRule.java | 33 ++++++++++++++-------- .../sql/engine/planner/AggregatePlannerTest.java | 1 + 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java index cfef67a48d..de6b738f29 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java @@ -236,7 +236,7 @@ public enum PlannerPhase { LogicalScanConverterRule.SYSTEM_VIEW_SCAN, HashAggregateConverterRule.COLOCATED, // HashAggregateConverterRule.MAP_REDUCE, - HashAggregateExchangeTransposeRule.INSTANCE, + HashAggregateExchangeTransposeRule.HASH_AGGREGATE_PUSH_DOWN, SortAggregateConverterRule.COLOCATED, SortAggregateExchangeTransposeRule.SORT_AGGREGATE_PUSH_DOWN, // SortAggregateConverterRule.MAP_REDUCE, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java index 549394ce90..70a088c4d0 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java @@ -74,15 +74,12 @@ public class HashAggregateConverterRule { /** {@inheritDoc} */ @Override - protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, - LogicalAggregate agg) { - if (HintUtils.isExpandDistinctAggregate(agg)) { - return null; - } - + protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalAggregate agg) { RelOptCluster cluster = agg.getCluster(); + RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single()); RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single()); + RelNode input = convert(agg.getInput(), inTrait); return new IgniteColocatedHashAggregate( diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java index d1da074376..22b7d12dda 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java @@ -45,7 +45,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates; import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder; -import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.HintUtils; import org.immutables.value.Value; @@ -56,7 +56,7 @@ import org.immutables.value.Value; */ @Value.Enclosing public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExchangeTransposeRule.Config> { - public static final RelOptRule INSTANCE = HashAggregateExchangeTransposeRule.Config.INSTANCE.toRule(); + public static final RelOptRule HASH_AGGREGATE_PUSH_DOWN = HashAggregateExchangeTransposeRule.Config.DEFAULT.toRule(); HashAggregateExchangeTransposeRule(HashAggregateExchangeTransposeRule.Config cfg) { super(cfg); @@ -71,6 +71,7 @@ public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExc assert !HintUtils.isExpandDistinctAggregate(aggregate); return exchange.distribution() == single() + && hashAlike(distribution(exchange.getInput())) && (canBePushedDown(aggregate, exchange) || canBeConvertedToMapReduce(aggregate)); } @@ -92,6 +93,11 @@ public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExc && !complexDistinctAgg(aggregate.getAggCallList()); } + private static boolean hashAlike(IgniteDistribution distribution) { + return distribution.getType() == Type.HASH_DISTRIBUTED + || distribution.getType() == Type.RANDOM_DISTRIBUTED; + } + @Override public void onMatch(RelOptRuleCall call) { IgniteColocatedHashAggregate agg = call.rel(0); @@ -103,14 +109,18 @@ public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExc assert agg.getGroupSets().size() == 1; assert agg.collation().getKeys().isEmpty(); + RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(distribution(exchange.getInput())); + RelTraitSet outTrait = agg.getTraitSet().replace(distribution(exchange.getInput())); + + cluster.getPlanner().prune(agg); + IgniteExchange relNode = new IgniteExchange( cluster, agg.getTraitSet(), new IgniteColocatedHashAggregate( cluster, - agg.getTraitSet() - .replace(distribution(exchange.getInput())), - exchange.getInput(), + outTrait, + convert(exchange.getInput(), inTrait), agg.getGroupSet(), agg.getGroupSets(), agg.getAggCallList() @@ -129,8 +139,6 @@ public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExc RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE); RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE); - RelTraitSet reducePhaseTraits = outTrait.replace(IgniteDistributions.single()); - AggregateRelBuilder relBuilder = new AggregateRelBuilder() { @Override public IgniteRel makeMapAgg(RelOptCluster cluster, RelNode input, ImmutableBitSet groupSet, @@ -138,8 +146,9 @@ public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExc return new IgniteMapHashAggregate( cluster, - outTrait.replace(IgniteDistributions.random()), - input, + outTrait.replace(distribution(input)), + // TODO: without conversion sibling rule SortedAggregateExchangeTransposeRules doesn't apply. why? + convert(input, inTrait.replace(distribution(input))), groupSet, groupSets, aggregateCalls @@ -166,8 +175,8 @@ public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExc return new IgniteReduceHashAggregate( cluster, - reducePhaseTraits, - convert(input, inTrait.replace(single())), + outTrait.replace(single()), + input, groupSet, groupSets, aggregateCalls, @@ -183,7 +192,7 @@ public class HashAggregateExchangeTransposeRule extends RelRule<HashAggregateExc @SuppressWarnings({"ClassNameSameAsAncestorName", "InnerClassFieldHidesOuterClassField"}) @Value.Immutable public interface Config extends RelRule.Config { - HashAggregateExchangeTransposeRule.Config INSTANCE = ImmutableHashAggregateExchangeTransposeRule.Config.of() + HashAggregateExchangeTransposeRule.Config DEFAULT = ImmutableHashAggregateExchangeTransposeRule.Config.of() .withDescription("HashAggregateExchangeTransposeRule") .withOperandSupplier(o0 -> o0.operand(IgniteColocatedHashAggregate.class) diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java index b5511c2068..535db7a61f 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java @@ -407,6 +407,7 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest { .and(not(hasAggregate())) .and(hasGroups()) .and(input(isInstanceOf(IgniteExchange.class) + .and(hasDistribution(single())) .and(input(isInstanceOf(IgniteMapSortAggregate.class) .and(not(hasAggregate())) .and(hasGroups())
