This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch aggregates-rules
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit d386dcd4e7ce0954de01089be4c7481a5fa8c2ce
Author: amashenkov <[email protected]>
AuthorDate: Mon Apr 15 21:35:04 2024 +0300

    Add hash aggregate push down rule
---
 .../internal/sql/engine/prepare/PlannerPhase.java  |   3 +-
 .../engine/rule/HashAggregateConverterRule.java    |  23 +-
 .../rule/HashAggregateExchangeTransposeRule.java   | 201 +++++++++
 .../planner/ColocatedHashAggregatePlannerTest.java |   1 +
 .../planner/ColocatedSortAggregatePlannerTest.java |   1 +
 .../planner/MapReduceHashAggregatePlannerTest.java | 465 ++++++++++-----------
 .../planner/MapReduceSortAggregatePlannerTest.java |   1 +
 7 files changed, 445 insertions(+), 250 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index e5092dd2b1..352172a051 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -235,7 +235,8 @@ public enum PlannerPhase {
             LogicalScanConverterRule.TABLE_SCAN,
             LogicalScanConverterRule.SYSTEM_VIEW_SCAN,
             HashAggregateConverterRule.COLOCATED,
-            HashAggregateConverterRule.MAP_REDUCE,
+            // HashAggregateConverterRule.MAP_REDUCE,
+            HashAggregateExchangeTransposeRule.INSTANCE,
             SortAggregateConverterRule.COLOCATED,
             SortAggregateExchangeTransposeRule.INSTANCE,
             // SortAggregateConverterRule.MAP_REDUCE,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
index 6542b6fc03..549394ce90 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
@@ -64,6 +65,13 @@ public class HashAggregateConverterRule {
             super(LogicalAggregate.class, 
"ColocatedHashAggregateConverterRule");
         }
 
+        @Override
+        public boolean matches(RelOptRuleCall call) {
+            LogicalAggregate aggregate = call.rel(0);
+
+            return !HintUtils.isExpandDistinctAggregate(aggregate);
+        }
+
         /** {@inheritDoc} */
         @Override
         protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery 
mq,
@@ -93,16 +101,19 @@ public class HashAggregateConverterRule {
             super(LogicalAggregate.class, 
"MapReduceHashAggregateConverterRule");
         }
 
+        @Override
+        public boolean matches(RelOptRuleCall call) {
+            LogicalAggregate aggregate = call.rel(0);
+
+            return !HintUtils.isExpandDistinctAggregate(aggregate)
+                    && canBeImplementedAsMapReduce(aggregate.getAggCallList())
+                    && !complexDistinctAgg(aggregate.getAggCallList());
+        }
+
         /** {@inheritDoc} */
         @Override
         protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery 
mq,
                 LogicalAggregate agg) {
-            if (complexDistinctAgg(agg.getAggCallList())
-                    || !canBeImplementedAsMapReduce(agg.getAggCallList())
-                    || HintUtils.isExpandDistinctAggregate(agg)) {
-                return null;
-            }
-
             RelOptCluster cluster = agg.getCluster();
             RelTraitSet inTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE);
             RelTraitSet outTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java
new file mode 100644
index 0000000000..d1da074376
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateExchangeTransposeRule.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import static 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.canBeImplementedAsMapReduce;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
+import static 
org.apache.ignite.internal.sql.engine.trait.TraitUtils.distribution;
+import static 
org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
+import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.HintUtils;
+import org.immutables.value.Value;
+
+/**
+ * A rule that pushes {@link IgniteColocatedHashAggregate} node under {@link 
IgniteExchange} if possible, otherwise, splits into map-reduce
+ * phases.
+ */
[email protected]
+public class HashAggregateExchangeTransposeRule extends 
RelRule<HashAggregateExchangeTransposeRule.Config> {
+    public static final RelOptRule INSTANCE = 
HashAggregateExchangeTransposeRule.Config.INSTANCE.toRule();
+
+    
HashAggregateExchangeTransposeRule(HashAggregateExchangeTransposeRule.Config 
cfg) {
+        super(cfg);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        IgniteColocatedHashAggregate aggregate = call.rel(0);
+        IgniteExchange exchange = call.rel(1);
+
+        // Conditions already checked by IgniteColocatedHashAggregate
+        assert !HintUtils.isExpandDistinctAggregate(aggregate);
+
+        return exchange.distribution() == single()
+                && (canBePushedDown(aggregate, exchange) || 
canBeConvertedToMapReduce(aggregate));
+    }
+
+    /**
+     * Returns {@code true} if an aggregate groups by distribution columns and 
therefore, can be pushed down under Exchange as-is,
+     * {@code false} otherwise.
+     */
+    private static boolean canBePushedDown(IgniteColocatedHashAggregate 
aggregate, IgniteExchange exchange) {
+        return distribution(exchange.getInput()).getType() == 
Type.HASH_DISTRIBUTED
+                && !nullOrEmpty(aggregate.getGroupSet())
+                && 
ImmutableBitSet.of(distribution(exchange.getInput()).getKeys()).equals(aggregate.getGroupSet());
+    }
+
+    /**
+     * Returns {@code true} if an aggregate can be converted to map-reduce 
phases, {@code false} otherwise.
+     */
+    private static boolean 
canBeConvertedToMapReduce(IgniteColocatedHashAggregate aggregate) {
+        return canBeImplementedAsMapReduce(aggregate.getAggCallList())
+                && !complexDistinctAgg(aggregate.getAggCallList());
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        IgniteColocatedHashAggregate agg = call.rel(0);
+        IgniteExchange exchange = call.rel(1);
+
+        RelOptCluster cluster = agg.getCluster();
+
+        if (canBePushedDown(agg, exchange)) {
+            assert agg.getGroupSets().size() == 1;
+            assert agg.collation().getKeys().isEmpty();
+
+            IgniteExchange relNode = new IgniteExchange(
+                    cluster,
+                    agg.getTraitSet(),
+                    new IgniteColocatedHashAggregate(
+                            cluster,
+                            agg.getTraitSet()
+                                    
.replace(distribution(exchange.getInput())),
+                            exchange.getInput(),
+                            agg.getGroupSet(),
+                            agg.getGroupSets(),
+                            agg.getAggCallList()
+                    ),
+                    exchange.distribution()
+            );
+
+            call.transformTo(relNode);
+
+            return;
+        }
+
+        // Create mapping to adjust fields on REDUCE phase.
+        Mapping fieldMappingOnReduce = 
Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet());
+
+        RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+        RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+
+        RelTraitSet reducePhaseTraits = 
outTrait.replace(IgniteDistributions.single());
+
+        AggregateRelBuilder relBuilder = new AggregateRelBuilder() {
+            @Override
+            public IgniteRel makeMapAgg(RelOptCluster cluster, RelNode input, 
ImmutableBitSet groupSet,
+                    List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggregateCalls) {
+
+                return new IgniteMapHashAggregate(
+                        cluster,
+                        outTrait.replace(IgniteDistributions.random()),
+                        input,
+                        groupSet,
+                        groupSets,
+                        aggregateCalls
+                );
+            }
+
+            @Override
+            public IgniteRel makeProject(RelOptCluster cluster, RelNode input, 
List<RexNode> reduceInputExprs,
+                    RelDataType projectRowType) {
+
+                return new IgniteProject(
+                        agg.getCluster(),
+                        input.getTraitSet(),
+                        input,
+                        reduceInputExprs,
+                        projectRowType
+                );
+            }
+
+            @Override
+            public IgniteRel makeReduceAgg(RelOptCluster cluster, RelNode 
input, ImmutableBitSet groupSet,
+                    List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggregateCalls, RelDataType outputType) {
+                assert distribution(input) == single();
+
+                return new IgniteReduceHashAggregate(
+                        cluster,
+                        reducePhaseTraits,
+                        convert(input, inTrait.replace(single())),
+                        groupSet,
+                        groupSets,
+                        aggregateCalls,
+                        outputType
+                );
+            }
+        };
+
+        call.transformTo(MapReduceAggregates.buildAggregates(agg, exchange, 
relBuilder, fieldMappingOnReduce));
+    }
+
+    /** Configuration. */
+    @SuppressWarnings({"ClassNameSameAsAncestorName", 
"InnerClassFieldHidesOuterClassField"})
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+        HashAggregateExchangeTransposeRule.Config INSTANCE = 
ImmutableHashAggregateExchangeTransposeRule.Config.of()
+                .withDescription("HashAggregateExchangeTransposeRule")
+                .withOperandSupplier(o0 ->
+                        o0.operand(IgniteColocatedHashAggregate.class)
+                                .oneInput(o1 ->
+                                        o1.operand(IgniteExchange.class)
+                                                .anyInputs()))
+                .as(HashAggregateExchangeTransposeRule.Config.class);
+
+        /** {@inheritDoc} */
+        @Override
+        default HashAggregateExchangeTransposeRule toRule() {
+            return new HashAggregateExchangeTransposeRule(this);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index 7f6833443e..d5c184bbbf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Test;
 public class ColocatedHashAggregatePlannerTest extends 
AbstractAggregatePlannerTest {
 
     private final String[] disableRules = {
+            "HashAggregateExchangeTransposeRule",
             "MapReduceHashAggregateConverterRule",
             "SortAggregateExchangeTransposeRule",
             "MapReduceSortAggregateConverterRule",
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 3a92ca2dbf..38219bda43 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test;
  */
 public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerTest {
     private final String[] disableRules = {
+            "HashAggregateExchangeTransposeRule",
             "MapReduceHashAggregateConverterRule",
             "MapReduceSortAggregateConverterRule",
             "SortAggregateExchangeTransposeRule",
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index 52d4333dc6..ba05af4de4 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -33,6 +33,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -51,7 +53,6 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
     private final String[] disableRules = {
             "SortAggregateExchangeTransposeRule",
             "MapReduceSortAggregateConverterRule",
-            "ColocatedHashAggregateConverterRule",
             "ColocatedSortAggregateConverterRule"
     };
 
@@ -81,11 +82,11 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkDistinctAggHash(TestCase.CASE_2_1B);
         checkDistinctAggHash(TestCase.CASE_2_2B);
 
-        checkDistinctAggHash(TestCase.CASE_2_1C);
-        checkDistinctAggHash(TestCase.CASE_2_2C);
+        checkColocatedDistinctAggHash(TestCase.CASE_2_1C);
+        checkColocatedDistinctAggHash(TestCase.CASE_2_2C);
 
-        checkDistinctAggHash(TestCase.CASE_2_1D);
-        checkDistinctAggHash(TestCase.CASE_2_2D);
+        checkColocatedDistinctAggHash(TestCase.CASE_2_1D);
+        checkColocatedDistinctAggHash(TestCase.CASE_2_2D);
     }
 
     /**
@@ -125,10 +126,10 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkSimpleAggWithGroupByHash(TestCase.CASE_5B);
         checkSimpleAggWithGroupByHash(TestCase.CASE_6B);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_5C);
-        checkSimpleAggWithGroupByHash(TestCase.CASE_6C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_6C);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_5D);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5D);
     }
 
     /**
@@ -175,11 +176,11 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkSimpleAggWithGroupByHash(TestCase.CASE_8_1B);
         checkSimpleAggWithGroupByHash(TestCase.CASE_8_2B);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_1C);
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_2C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2C);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_1D);
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_2D);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1D);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2D);
     }
 
     /**
@@ -201,11 +202,11 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10B);
         checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11B);
 
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9C);
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10C);
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11C);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9C);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_10C);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_11C);
 
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9D);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9D);
     }
 
     /**
@@ -217,8 +218,9 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
         checkGroupWithNoAggregateHash(TestCase.CASE_12A);
         checkGroupWithNoAggregateHash(TestCase.CASE_12B);
-        checkGroupWithNoAggregateHash(TestCase.CASE_12C);
-        checkGroupWithNoAggregateHash(TestCase.CASE_12D);
+
+        checkColocatedGroupWithNoAggregateHash(TestCase.CASE_12C);
+        checkColocatedGroupWithNoAggregateHash(TestCase.CASE_12D);
     }
 
     /**
@@ -230,8 +232,9 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
         checkGroupWithNoAggregateHash(TestCase.CASE_13A);
         checkGroupWithNoAggregateHash(TestCase.CASE_13B);
-        checkGroupWithNoAggregateHash(TestCase.CASE_13C);
-        checkGroupWithNoAggregateHash(TestCase.CASE_13D);
+
+        checkColocatedGroupWithNoAggregateHash(TestCase.CASE_13C);
+        checkColocatedGroupWithNoAggregateHash(TestCase.CASE_13D);
     }
 
     /**
@@ -263,39 +266,24 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
         assertPlan(TestCase.CASE_16,
                 nodeOrAnyChild(isInstanceOf(IgniteSort.class)
-                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                        .and(input(isTableScan("TEST")))
-                                ))
+                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                .and(input(isTableScan("TEST")))
                         ))
                 ),
                 ArrayUtils.concat(disableRules, additionalRulesToDisable)
         );
 
-        assertPlan(TestCase.CASE_16A,
-                nodeOrAnyChild(isInstanceOf(IgniteSort.class)
-                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                
.and(input(isTableScan("TEST")))
-                                        ))
-                                ))
-                        ))
-                ),
-                ArrayUtils.concat(disableRules, additionalRulesToDisable)
-        );
-        assertPlan(TestCase.CASE_16B,
-                nodeOrAnyChild(isInstanceOf(IgniteSort.class)
-                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                
.and(input(isTableScan("TEST")))
-                                        ))
+        Predicate<RelNode> aggregateOverIndexScan = 
nodeOrAnyChild(isInstanceOf(IgniteSort.class)
+                .and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                        .and(input(isTableScan("TEST")))
                                 ))
                         ))
-                ),
-                ArrayUtils.concat(disableRules, additionalRulesToDisable)
+                ))
         );
+        assertPlan(TestCase.CASE_16A, aggregateOverIndexScan, 
ArrayUtils.concat(disableRules, additionalRulesToDisable));
+        assertPlan(TestCase.CASE_16B, aggregateOverIndexScan, 
ArrayUtils.concat(disableRules, additionalRulesToDisable));
     }
 
     /**
@@ -305,12 +293,10 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
     public void emptyCollationPassThroughLimit() throws Exception {
         assertPlan(TestCase.CASE_17,
                 hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteReduceHashAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteLimit.class)
-                                                
.and(input(isInstanceOf(IgniteSort.class)
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
+                        .and(input(1, 
isInstanceOf(IgniteColocatedHashAggregate.class)
+                                .and(input(isInstanceOf(IgniteLimit.class)
+                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                
.and(input(isTableScan("TEST")))
                                         ))
                                 ))
                         ))
@@ -318,38 +304,19 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 disableRules
         );
 
-        assertPlan(TestCase.CASE_17A,
-                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteReduceHashAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteLimit.class)
-                                                
.and(input(isInstanceOf(IgniteExchange.class)
-                                                        
.and(input(isInstanceOf(IgniteSort.class)
-                                                                
.and(input(isTableScan("TEST")))
-                                                        ))
-                                                ))
-                                        ))
-                                ))
-                        ))
-                ),
-                disableRules
-        );
-        assertPlan(TestCase.CASE_17B,
-                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteReduceHashAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteLimit.class)
-                                                
.and(input(isInstanceOf(IgniteExchange.class)
-                                                        
.and(input(isInstanceOf(IgniteSort.class)
-                                                                
.and(input(isTableScan("TEST")))
-                                                        ))
-                                                ))
+        Predicate<RelNode> aggregateWithLimit = 
hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                .and(input(1, isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(input(isInstanceOf(IgniteLimit.class)
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                
.and(input(isTableScan("TEST")))
                                         ))
                                 ))
                         ))
-                ),
-                disableRules
+                ))
         );
+        assertPlan(TestCase.CASE_17A, aggregateWithLimit, disableRules);
+        assertPlan(TestCase.CASE_17B, aggregateWithLimit, disableRules);
     }
 
     /**
@@ -393,21 +360,15 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void expandDistinctAggregates() throws Exception {
-        Predicate<? extends RelNode> subtreePredicate = 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+        Predicate<? extends RelNode> subtreePredicate = 
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                 // Check the second aggregation step contains accumulators.
                 // Plan must not contain distinct accumulators.
                 .and(hasAggregate())
                 .and(not(hasDistinctAggregate()))
-                .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                        // Check the first aggregation step is SELECT DISTINCT 
(doesn't contain any accumulators)
-                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                .and(not(hasAggregate()))
-                                .and(hasGroups())
-                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                ))
-                        ))
+                // Check the first aggregation step is SELECT DISTINCT 
(doesn't contain any accumulators)
+                .and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(not(hasAggregate()))
+                        .and(hasGroups())
                 ))
         );
 
@@ -416,21 +377,21 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 .and(input(1, subtreePredicate))
         ), disableRules);
 
-        subtreePredicate = 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+        subtreePredicate = 
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                 // Check the second aggregation step contains accumulators.
                 // Plan must not contain distinct accumulators.
                 .and(hasAggregate())
                 .and(not(hasDistinctAggregate()))
-                .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                        // Check the first aggregation step is SELECT DISTINCT 
(doesn't contain any accumulators)
-                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                .and(not(hasAggregate()))
-                                .and(hasGroups())
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                .and(not(hasAggregate()))
-                                                .and(hasGroups())
-                                        ))
+                // Check the first aggregation step is SELECT DISTINCT 
(doesn't contain any accumulators)
+                .and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                        .and(not(hasAggregate()))
+                        .and(hasGroups())
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                
.and(hasDistribution(IgniteDistributions.single()))
+                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                        .and(not(hasAggregate()))
+                                        .and(hasGroups())
+                                        
.and(input(isInstanceOf(IgniteTableScan.class)))
                                 ))
                         ))
                 ))
@@ -471,10 +432,17 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         ))
                 ));
 
+        Predicate<RelNode> colocated = 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                .and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(in -> 
hasAggregates(countMap).test(in.getAggCallList()))
+                        .and(input(isTableScan("TEST")))
+                )));
+
         assertPlan(TestCase.CASE_22, nonColocated, disableRules);
         assertPlan(TestCase.CASE_22A, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_22B, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_22C, nonColocated, disableRules);
+        assertPlan(TestCase.CASE_22B, colocated, disableRules);
+        assertPlan(TestCase.CASE_22C, colocated, disableRules);
     }
 
     /**
@@ -482,6 +450,9 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void twoPhaseAvgAgg() throws Exception {
+        Predicate<AggregateCall> avgColocated = (a) ->
+                Objects.equals(a.getAggregation().getName(), "AVG") && 
a.getArgList().equals(List.of(1));
+
         Predicate<AggregateCall> sumMap = (a) ->
                 Objects.equals(a.getAggregation().getName(), "SUM") && 
a.getArgList().equals(List.of(1));
 
@@ -494,21 +465,28 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         Predicate<AggregateCall> sum0Reduce = (a) ->
                 Objects.equals(a.getAggregation().getName(), "$SUM0") && 
a.getArgList().equals(List.of(2));
 
-        Predicate<RelNode> nonColocated = 
hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class)
+        Predicate<RelNode> nonColocated = 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
                 .and(in -> hasAggregates(sumReduce, 
sum0Reduce).test(in.getAggregateCalls()))
                 .and(input(isInstanceOf(IgniteExchange.class)
                         .and(hasDistribution(IgniteDistributions.single()))
                         .and(input(isInstanceOf(IgniteProject.class)
                                 
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                .and(in -> 
hasAggregates(sumMap, countMap).test(in.getAggCallList()))
-                                        )
+                                        .and(in -> hasAggregates(sumMap, 
countMap).test(in.getAggCallList()))
                                 ))
-                        ))));
+                        ))
+                )));
+
+        Predicate<RelNode> colocated = 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                .and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(in -> 
hasAggregates(avgColocated).test(in.getAggCallList()))
+                        .and(input(isTableScan("TEST")))
+                )));
 
         assertPlan(TestCase.CASE_23, nonColocated, disableRules);
         assertPlan(TestCase.CASE_23A, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_23B, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_23C, nonColocated, disableRules);
+        assertPlan(TestCase.CASE_23B, colocated, disableRules);
+        assertPlan(TestCase.CASE_23C, colocated, disableRules);
     }
 
     /**
@@ -516,19 +494,10 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void countDistinctGroupSetSingle() throws Exception {
-        Predicate<IgniteReduceHashAggregate> inputAgg = 
isInstanceOf(IgniteReduceHashAggregate.class)
-                .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
-                .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                        .and(hasGroupSets(Aggregate::getGroupSets, 1))
-                ));
-
-        assertPlan(TestCase.CASE_24_1, 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        
.and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets))
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                
.and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets))
-                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
-                                ))
-                        )),
+        assertPlan(TestCase.CASE_24_1, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+                        .and(input(isTableScan("TEST")))
+                ),
                 disableRules);
     }
 
@@ -539,9 +508,9 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
     public void countDistinctGroupSetHash() throws Exception {
         checkCountDistinctHash(TestCase.CASE_24_1A);
         checkCountDistinctHash(TestCase.CASE_24_1B);
-        checkCountDistinctHash(TestCase.CASE_24_1C);
+        checkCountDistinctHashColocated(TestCase.CASE_24_1C);
         checkCountDistinctHash(TestCase.CASE_24_1D);
-        checkCountDistinctHash(TestCase.CASE_24_1E);
+        checkCountDistinctHashColocated(TestCase.CASE_24_1E);
     }
 
     /**
@@ -566,11 +535,9 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     private void checkSimpleAggSingle(TestCase testCase) throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                .and(hasAggregate())
-                                .and(input(isTableScan("TEST")))
-                        ))
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(hasAggregate())
+                        .and(input(isTableScan("TEST")))
                 ),
                 disableRules
         );
@@ -592,12 +559,10 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     private void checkSimpleAggWithGroupBySingle(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                .and(hasAggregate())
-                                .and(hasGroups())
-                                .and(input(isTableScan("TEST")))
-                        ))
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(hasAggregate())
+                        .and(hasGroups())
+                        .and(input(isTableScan("TEST")))
                 ),
                 disableRules
         );
@@ -618,21 +583,15 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
-    private void checkDistinctAggSingle(TestCase testCase) throws Exception {
+    private void checkSimpleAggWithColocatedGroupByHash(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                .and(hasAggregate())
-                                .and(not(hasDistinctAggregate()))
-                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        .and(not(hasAggregate()))
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(input(isInstanceOf(IgniteProject.class)
+                                
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                        .and(hasAggregate())
                                         .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                .and(not(hasAggregate()))
-                                                
.and(input(isTableScan("TEST")))
-                                        ))
+                                        .and(input(isTableScan("TEST")))
                                 ))
                         ))
                 ),
@@ -640,22 +599,28 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
+    private void checkDistinctAggSingle(TestCase testCase) throws Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(hasDistinctAggregate())
+                        .and(input(isTableScan("TEST")))
+                ),
+                disableRules
+        );
+    }
+
     private void checkDistinctAggHash(TestCase testCase) throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(hasAggregate())
                         .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                .and(hasAggregate())
-                                .and(not(hasDistinctAggregate()))
-                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                        
.and(not(hasAggregate()))
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
+                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                                .and(not(hasAggregate()))
+                                .and(hasGroups())
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                .and(not(hasAggregate()))
+                                                
.and(input(isTableScan("TEST")))
                                         ))
                                 ))
                         ))),
@@ -663,22 +628,27 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
-    private void checkDistinctAggWithGroupBySingle(TestCase testCase) throws 
Exception {
+    private void checkColocatedDistinctAggHash(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(hasAggregate())
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                
.and(hasDistribution(IgniteDistributions.single()))
+                                
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
                                         .and(not(hasAggregate()))
                                         .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                .and(not(hasAggregate()))
-                                                .and(hasGroups())
-                                                
.and(input(isTableScan("TEST")))
-                                        ))
+                                        .and(input(isTableScan("TEST")))
                                 ))
-                        ))
+                        ))),
+                disableRules
+        );
+    }
+
+    private void checkDistinctAggWithGroupBySingle(TestCase testCase) throws 
Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(hasDistinctAggregate())
+                        .and(input(isTableScan("TEST")))
                 ),
                 disableRules
         );
@@ -686,19 +656,17 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     private void checkDistinctAggWithGroupByHash(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(hasAggregate())
                         .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                        
.and(not(hasAggregate()))
-                                                        .and(hasGroups())
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
+                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                                .and(not(hasAggregate()))
+                                .and(hasGroups())
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                .and(not(hasAggregate()))
+                                                .and(hasGroups())
+                                                
.and(input(isTableScan("TEST")))
                                         ))
                                 ))
                         ))
@@ -709,34 +677,22 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                        
.and(not(hasAggregate()))
-                                                        .and(hasGroups())
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
-                                        ))
-                                ))
-                        ))
-                ),
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        
.and(nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                .and(hasDistinctAggregate())
+                                .and(hasGroups())
+                                .and(input(isTableScan("TEST")))
+                        ))),
                 disableRules
         );
     }
 
     private void checkAggWithGroupByIndexColumnsSingle(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                .and(hasAggregate())
-                                .and(input(isTableScan("TEST")))
-                        ))
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(hasAggregate())
+                        .and(input(isTableScan("TEST")))
                 ),
                 disableRules
         );
@@ -756,16 +712,27 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
+    private void checkAggWithColocatedGroupByIndexColumnsHash(TestCase 
testCase) throws Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(input(isInstanceOf(IgniteProject.class)
+                                
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                        .and(hasAggregate())
+                                        .and(input(isTableScan("TEST")))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
+    }
+
     private void checkGroupWithNoAggregateSingle(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(not(hasAggregate()))
                         .and(hasGroups())
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                .and(not(hasAggregate()))
-                                .and(hasGroups())
-                                .and(input(isTableScan("TEST")))
-                        ))
+                        .and(input(isTableScan("TEST")))
                 ),
                 disableRules
         );
@@ -788,16 +755,27 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
+
+    private void checkColocatedGroupWithNoAggregateHash(TestCase testCase) 
throws Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                .and(not(hasAggregate()))
+                                .and(hasGroups())
+                                .and(input(isTableScan("TEST")))
+                        ))
+                ),
+                disableRules
+        );
+    }
+
     private void checkGroupsWithOrderByGroupColumnsSingle(TestCase testCase, 
RelCollation collation) throws Exception {
         assertPlan(testCase,
                 isInstanceOf(IgniteSort.class)
                         .and(s -> s.collation().equals(collation))
-                        .and(input(isInstanceOf(IgniteProject.class)
-                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                
.and(input(isTableScan("TEST")))
-                                        ))
-                                ))
+                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                .and(input(isTableScan("TEST")))
+
                         )),
                 disableRules
         );
@@ -820,7 +798,6 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
-
     private void checkCountDistinctHash(TestCase testCase) throws Exception {
         Predicate<IgniteReduceHashAggregate> inputAgg = 
isInstanceOf(IgniteReduceHashAggregate.class)
                 .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
@@ -831,27 +808,33 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         ))
                 ));
 
-        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        
.and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets))
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                
.and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets))
-                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
-                                ))
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(hasNoGroupSets(Aggregate::getGroupSets))
+                        
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
                         )),
                 disableRules);
     }
 
+    private void checkCountDistinctHashColocated(TestCase testCase) throws 
Exception {
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(hasNoGroupSets(Aggregate::getGroupSets))
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                
.and(hasDistribution(IgniteDistributions.single()))
+                                
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                        
.and(hasGroupSets(Aggregate::getGroupSets, 1))
+                                ))
+                        ))),
+                disableRules);
+    }
+
     private void checkDerivedCollationWithOrderByGroupColumnSingle(TestCase 
testCase) throws Exception {
         RelCollation requiredCollation = 
RelCollations.of(TraitUtils.createFieldCollation(0, 
Collation.DESC_NULLS_FIRST));
 
         assertPlan(testCase, isInstanceOf(IgniteSort.class)
-                .and(hasCollation(requiredCollation))
-                
.and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(hasAggregate())
-                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                        .and(hasCollation(requiredCollation))
+                        
.and(nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                                 .and(hasAggregate())
-                        ))
-                )),
+                        )),
                 disableRules);
     }
 
@@ -859,16 +842,16 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         RelCollation requiredCollation = 
RelCollations.of(TraitUtils.createFieldCollation(0, 
Collation.DESC_NULLS_FIRST));
 
         assertPlan(testCase, isInstanceOf(IgniteSort.class)
-                .and(hasCollation(requiredCollation))
-                
.and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
-                        .and(hasAggregate())
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                
.and(hasDistribution(IgniteDistributions.single()))
-                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                        .and(hasAggregate())
+                        .and(hasCollation(requiredCollation))
+                        
.and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                                .and(hasAggregate())
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        
.and(hasDistribution(IgniteDistributions.single()))
+                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                .and(hasAggregate())
+                                        ))
                                 ))
-                        ))
-                )),
+                        )),
                 disableRules);
     }
 
@@ -880,11 +863,7 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 isInstanceOf(IgniteSort.class)
                         .and(hasCollation(outputCollation))
-                        .and(input(isInstanceOf(IgniteProject.class)
-                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)))
-                                ))
-                        )),
+                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class))),
                 disableRules);
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index 76fca16b6c..e570a0e384 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test;
  */
 public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerTest {
     private final String[] disableRules = {
+            "HashAggregateExchangeTransposeRule",
             "MapReduceHashAggregateConverterRule",
             "ColocatedHashAggregateConverterRule",
     };

Reply via email to