This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch aggregates-rules
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 10d2551874280cace0173655ff89267f1d416a5d
Author: amashenkov <[email protected]>
AuthorDate: Tue Apr 16 01:53:07 2024 +0300

    fixup! Add hash aggregate push down rule
---
 .../internal/sql/engine/prepare/PlannerPhase.java  |  2 +-
 .../engine/rule/HashAggregateConverterRule.java    |  9 ++----
 .../rule/HashAggregateExchangeTransposeRule.java   | 33 ++++++++++++++--------
 .../sql/engine/planner/AggregatePlannerTest.java   |  1 +
 4 files changed, 26 insertions(+), 19 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index cfef67a48d..de6b738f29 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -236,7 +236,7 @@ public enum PlannerPhase {
             LogicalScanConverterRule.SYSTEM_VIEW_SCAN,
             HashAggregateConverterRule.COLOCATED,
             // HashAggregateConverterRule.MAP_REDUCE,
-            HashAggregateExchangeTransposeRule.INSTANCE,
+            HashAggregateExchangeTransposeRule.HASH_AGGREGATE_PUSH_DOWN,
             SortAggregateConverterRule.COLOCATED,
             SortAggregateExchangeTransposeRule.SORT_AGGREGATE_PUSH_DOWN,
             // SortAggregateConverterRule.MAP_REDUCE,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
index 549394ce90..70a088c4d0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
@@ -74,15 +74,12 @@ public class HashAggregateConverterRule {
 
         /** {@inheritDoc} */
         @Override
-        protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery 
mq,
-                LogicalAggregate agg) {
-            if (HintUtils.isExpandDistinctAggregate(agg)) {
-                return null;
-            }
-
+        protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery 
mq, LogicalAggregate agg) {
             RelOptCluster cluster = agg.getCluster();
+
             RelTraitSet inTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
             RelTraitSet outTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(IgniteDistributions.single());
+
             RelNode input = convert(agg.getInput(), inTrait);
 
             return new IgniteColocatedHashAggregate(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java
index d1da074376..22b7d12dda 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java
@@ -45,7 +45,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HintUtils;
 import org.immutables.value.Value;
@@ -56,7 +56,7 @@ import org.immutables.value.Value;
  */
 @Value.Enclosing
 public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExchangeTransposeRule.Config> {
-    public static final RelOptRule INSTANCE = 
HashAggregateExchangeTransposeRule.Config.INSTANCE.toRule();
+    public static final RelOptRule HASH_AGGREGATE_PUSH_DOWN = 
HashAggregateExchangeTransposeRule.Config.DEFAULT.toRule();
 
     
HashAggregateExchangeTransposeRule(HashAggregateExchangeTransposeRule.Config 
cfg) {
         super(cfg);
@@ -71,6 +71,7 @@ public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExc
         assert !HintUtils.isExpandDistinctAggregate(aggregate);
 
         return exchange.distribution() == single()
+                && hashAlike(distribution(exchange.getInput()))
                 && (canBePushedDown(aggregate, exchange) || 
canBeConvertedToMapReduce(aggregate));
     }
 
@@ -92,6 +93,11 @@ public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExc
                 && !complexDistinctAgg(aggregate.getAggCallList());
     }
 
+    private static boolean hashAlike(IgniteDistribution distribution) {
+        return distribution.getType() == Type.HASH_DISTRIBUTED
+                || distribution.getType() == Type.RANDOM_DISTRIBUTED;
+    }
+
     @Override
     public void onMatch(RelOptRuleCall call) {
         IgniteColocatedHashAggregate agg = call.rel(0);
@@ -103,14 +109,18 @@ public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExc
             assert agg.getGroupSets().size() == 1;
             assert agg.collation().getKeys().isEmpty();
 
+            RelTraitSet inTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(distribution(exchange.getInput()));
+            RelTraitSet outTrait =  
agg.getTraitSet().replace(distribution(exchange.getInput()));
+
+            cluster.getPlanner().prune(agg);
+
             IgniteExchange relNode = new IgniteExchange(
                     cluster,
                     agg.getTraitSet(),
                     new IgniteColocatedHashAggregate(
                             cluster,
-                            agg.getTraitSet()
-                                    
.replace(distribution(exchange.getInput())),
-                            exchange.getInput(),
+                            outTrait,
+                            convert(exchange.getInput(), inTrait),
                             agg.getGroupSet(),
                             agg.getGroupSets(),
                             agg.getAggCallList()
@@ -129,8 +139,6 @@ public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExc
         RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
         RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
 
-        RelTraitSet reducePhaseTraits = 
outTrait.replace(IgniteDistributions.single());
-
         AggregateRelBuilder relBuilder = new AggregateRelBuilder() {
             @Override
             public IgniteRel makeMapAgg(RelOptCluster cluster, RelNode input, 
ImmutableBitSet groupSet,
@@ -138,8 +146,9 @@ public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExc
 
                 return new IgniteMapHashAggregate(
                         cluster,
-                        outTrait.replace(IgniteDistributions.random()),
-                        input,
+                        outTrait.replace(distribution(input)),
+                        // TODO: without conversion sibling rule 
SortedAggregateExchangeTransposeRules doesn't apply. why?
+                        convert(input, inTrait.replace(distribution(input))),
                         groupSet,
                         groupSets,
                         aggregateCalls
@@ -166,8 +175,8 @@ public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExc
 
                 return new IgniteReduceHashAggregate(
                         cluster,
-                        reducePhaseTraits,
-                        convert(input, inTrait.replace(single())),
+                        outTrait.replace(single()),
+                        input,
                         groupSet,
                         groupSets,
                         aggregateCalls,
@@ -183,7 +192,7 @@ public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExc
     @SuppressWarnings({"ClassNameSameAsAncestorName", 
"InnerClassFieldHidesOuterClassField"})
     @Value.Immutable
     public interface Config extends RelRule.Config {
-        HashAggregateExchangeTransposeRule.Config INSTANCE = 
ImmutableHashAggregateExchangeTransposeRule.Config.of()
+        HashAggregateExchangeTransposeRule.Config DEFAULT = 
ImmutableHashAggregateExchangeTransposeRule.Config.of()
                 .withDescription("HashAggregateExchangeTransposeRule")
                 .withOperandSupplier(o0 ->
                         o0.operand(IgniteColocatedHashAggregate.class)
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index b5511c2068..535db7a61f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -407,6 +407,7 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
                         .and(not(hasAggregate()))
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(not(hasAggregate()))
                                         .and(hasGroups())

Reply via email to