This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch aggregates-rules
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit d1831fd91087cb8d586378759ce35864f714d98d
Author: amashenkov <[email protected]>
AuthorDate: Tue Apr 9 22:28:42 2024 +0300

    Add sorted aggregate push down rule
---
 .../internal/sql/engine/prepare/PlannerPhase.java  |   5 +-
 .../rel/agg/IgniteColocatedAggregateBase.java      |  10 +
 .../sql/engine/rel/agg/MapReduceAggregates.java    | 262 +++++++++++
 .../rule/SortAggregateExchangeTransposeRule.java   | 212 +++++++++
 .../ignite/internal/sql/engine/util/HintUtils.java |   4 +-
 .../sql/engine/planner/AggregatePlannerTest.java   |  31 +-
 .../planner/ColocatedHashAggregatePlannerTest.java |   1 +
 .../planner/ColocatedSortAggregatePlannerTest.java |   1 +
 .../planner/MapReduceHashAggregatePlannerTest.java |   1 +
 .../planner/MapReduceSortAggregatePlannerTest.java | 492 +++++++++------------
 10 files changed, 718 insertions(+), 301 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 71229e08f0..e5092dd2b1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -50,12 +50,14 @@ import 
org.apache.ignite.internal.sql.engine.rule.FilterConverterRule;
 import 
org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToHashIndexSpoolRule;
 import 
org.apache.ignite.internal.sql.engine.rule.FilterSpoolMergeToSortedIndexSpoolRule;
 import org.apache.ignite.internal.sql.engine.rule.HashAggregateConverterRule;
+import 
org.apache.ignite.internal.sql.engine.rule.HashAggregateExchangeTransposeRule;
 import org.apache.ignite.internal.sql.engine.rule.LogicalScanConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.MergeJoinConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.NestedLoopJoinConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.ProjectConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.SetOpConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.SortAggregateConverterRule;
+import 
org.apache.ignite.internal.sql.engine.rule.SortAggregateExchangeTransposeRule;
 import org.apache.ignite.internal.sql.engine.rule.SortConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.SortExchangeTransposeRule;
 import 
org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule;
@@ -235,7 +237,8 @@ public enum PlannerPhase {
             HashAggregateConverterRule.COLOCATED,
             HashAggregateConverterRule.MAP_REDUCE,
             SortAggregateConverterRule.COLOCATED,
-            SortAggregateConverterRule.MAP_REDUCE,
+            SortAggregateExchangeTransposeRule.INSTANCE,
+            // SortAggregateConverterRule.MAP_REDUCE,
             SetOpConverterRule.COLOCATED_MINUS,
             SetOpConverterRule.MAP_REDUCE_MINUS,
             SetOpConverterRule.COLOCATED_INTERSECT,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
index 565193b70b..e1f42e90da 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
@@ -84,6 +84,16 @@ public abstract class IgniteColocatedAggregateBase extends 
IgniteAggregate imple
             return List.of(Pair.of(nodeTraits.replace(inDistribution), 
inputTraits));
         }
 
+        // Single group must have distribution of single.
+        if (groupSet.isEmpty()) {
+            return List.of(
+                    Pair.of(
+                            nodeTraits.replace(IgniteDistributions.single()),
+                            
List.of(inputTraits.get(0).replace(IgniteDistributions.single()))
+                    )
+            );
+        }
+
         // Otherwise if stream is distributed by hash function, and grouping 
columns is a super-set of
         // distribution keys, we can use colocated aggregate as well
         if (inDistribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) 
{
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
index ec8c2819e4..1e7efc995f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
@@ -47,9 +47,11 @@ import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mapping;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.jetbrains.annotations.TestOnly;
@@ -344,6 +346,265 @@ public class MapReduceAggregates {
         return new IgniteProject(agg.getCluster(), reduce.getTraitSet(), 
reduce, projectionList, agg.getRowType());
     }
 
+    /**
+     * Converts the given colocated sorted aggregate over exchange operator 
into a MAP/REDUCE sorted aggregate implementation.
+     *
+     * @param agg Colocated sorted aggregate.
+     * @param exchange Exchange operator.
+     * @param builder Builder to create implementations of MAP and REDUCE 
phases.
+     * @param fieldMappingOnReduce Mapping to be applied to group sets on 
REDUCE phase.
+     *
+     * @return A physical node tree that implements the given logical operator.
+     */
+    public static IgniteRel buildAggregates(
+            IgniteColocatedAggregateBase agg,
+            IgniteExchange exchange,
+            AggregateRelBuilder builder,
+            Mapping fieldMappingOnReduce
+    ) {
+
+        //
+        // To implement MAP/REDUCE aggregate LogicalAggregate is transformed 
into
+        // a map aggregate node, a reduce aggregate node, and an optional 
project node
+        // (since some aggregate can be split into multiple ones, or require 
some additional work after REDUCE phase,
+        // to combine the results).
+        //
+        // SELECT c1, MIN(c2), COUNT(c3) FROM test GROUP BY c1, c2
+        //
+        // MAP      [c1, c2, map_agg1, map_agg2]
+        // REDUCE   [c1, c2, reduce_agg1, reduce_agg2]
+        // PROJECT: [c1, c2, expr_agg1, expr_agg2]
+        //
+        // =>
+        //
+        // {map: map_agg1, reduce: reduce_agg1, expr: expr_agg1, ..}
+        // {map: map_agg2, reduce: reduce_agg2, expr: expr_agg2, ..}
+        //
+
+        // Create a list of descriptors for map/reduce version of the given 
arguments.
+        // This list is later used to create MAP/REDUCE version of each 
aggregate.
+
+        List<MapReduceAgg> mapReduceAggs = new 
ArrayList<>(agg.getAggCallList().size());
+        // groupSet includes all columns from GROUP BY/GROUPING SETS clauses.
+        int argumentOffset = agg.getGroupSet().cardinality();
+
+        // MAP PHASE AGGREGATE
+
+        List<AggregateCall> mapAggCalls = new 
ArrayList<>(agg.getAggCallList().size());
+
+        RelNode input = exchange.getInput();
+        for (AggregateCall call : agg.getAggCallList()) {
+            // See ReturnTypes::AVG_AGG_FUNCTION, Result type of a aggregate 
with no grouping or with filtering can be nullable.
+            boolean canBeNull = agg.getGroupCount() == 0 || call.hasFilter();
+
+            MapReduceAgg mapReduceAgg = createMapReduceAggCall(
+                    Commons.cluster(),
+                    call,
+                    argumentOffset,
+                    input.getRowType(),
+                    canBeNull
+            );
+            argumentOffset += mapReduceAgg.reduceCalls.size();
+            mapReduceAggs.add(mapReduceAgg);
+
+            mapAggCalls.addAll(mapReduceAgg.mapCalls);
+        }
+
+        // MAP phase should have no less than the number of arguments as 
original aggregate.
+        // Otherwise there is a bug, because some aggregates were ignored.
+        assert mapAggCalls.size() >= agg.getAggCallList().size() :
+                format("The number of MAP aggregates is not correct. Original: 
{}\nMAP: {}", agg.getAggCallList(), mapAggCalls);
+
+        RelNode map = builder.makeMapAgg(
+                agg.getCluster(),
+                input,
+                agg.getGroupSet(),
+                agg.getGroupSets(),
+                mapAggCalls
+        );
+
+        //
+        // REDUCE INPUT PROJECTION
+        //
+
+        RelDataTypeFactory.Builder reduceType = new 
Builder(Commons.typeFactory());
+
+        int groupByColumns = agg.getGroupSet().cardinality();
+        boolean sameAggsForBothPhases = true;
+
+        // Build row type for input of REDUCE phase.
+        // It consists of columns from agg.groupSet and aggregate expressions.
+
+        for (int i = 0; i < groupByColumns; i++) {
+            List<RelDataTypeField> outputRowFields = 
agg.getRowType().getFieldList();
+            RelDataType type = outputRowFields.get(i).getType();
+            reduceType.add("f" + reduceType.getFieldCount(), type);
+        }
+
+        RexBuilder rexBuilder = agg.getCluster().getRexBuilder();
+        IgniteTypeFactory typeFactory = (IgniteTypeFactory) 
agg.getCluster().getTypeFactory();
+
+        List<RexNode> reduceInputExprs = new ArrayList<>();
+
+        for (int i = 0; i < map.getRowType().getFieldList().size(); i++) {
+            RelDataType type = 
map.getRowType().getFieldList().get(i).getType();
+            RexInputRef ref = new RexInputRef(i, type);
+            reduceInputExprs.add(ref);
+        }
+
+        // Build a list of projections for reduce operator,
+        // if all projections are identity, it is not necessary
+        // to create a projection between MAP and REDUCE operators.
+
+        boolean additionalProjectionsForReduce = false;
+
+        for (int i = 0, argOffset = 0; i < mapReduceAggs.size(); i++) {
+            MapReduceAgg mapReduceAgg = mapReduceAggs.get(i);
+            int argIdx = groupByColumns + argOffset;
+
+            for (int j = 0; j < mapReduceAgg.reduceCalls.size(); j++) {
+                RexNode projExpr = 
mapReduceAgg.makeReduceInputExpr.makeExpr(rexBuilder, map, List.of(argIdx), 
typeFactory);
+                reduceInputExprs.set(argIdx, projExpr);
+
+                if (mapReduceAgg.makeReduceInputExpr != USE_INPUT_FIELD) {
+                    additionalProjectionsForReduce = true;
+                }
+
+                argIdx += 1;
+            }
+
+            argOffset += mapReduceAgg.reduceCalls.size();
+        }
+
+        RelNode reduceInputNode;
+        if (additionalProjectionsForReduce) {
+            RelDataTypeFactory.Builder projectRow = new 
Builder(agg.getCluster().getTypeFactory());
+
+            for (int i = 0; i < reduceInputExprs.size(); i++) {
+                RexNode rexNode = reduceInputExprs.get(i);
+                projectRow.add(String.valueOf(i), rexNode.getType());
+            }
+
+            RelDataType projectRowType = projectRow.build();
+
+            reduceInputNode = builder.makeProject(agg.getCluster(), map, 
reduceInputExprs, projectRowType);
+        } else {
+            reduceInputNode = map;
+        }
+
+        // ADD EXCHANGE
+        reduceInputNode = new IgniteExchange(
+                agg.getCluster(),
+                map.getTraitSet().replace(IgniteDistributions.single()),
+                reduceInputNode,
+                IgniteDistributions.single()
+        );
+
+        //
+        // REDUCE PHASE AGGREGATE
+        //
+        // Build a list of aggregate calls for REDUCE phase.
+        // Build a list of projections (arg-list, expr) that accept reduce 
phase and combine/collect/cast results.
+
+        List<AggregateCall> reduceAggCalls = new ArrayList<>();
+        List<Map.Entry<List<Integer>, MakeReduceExpr>> projection = new 
ArrayList<>(mapReduceAggs.size());
+
+        for (MapReduceAgg mapReduceAgg : mapReduceAggs) {
+            // Update row type returned by REDUCE node.
+            int i = 0;
+            for (AggregateCall reduceCall : mapReduceAgg.reduceCalls) {
+                reduceType.add("f" + i + "_" + reduceType.getFieldCount(), 
reduceCall.getType());
+                reduceAggCalls.add(reduceCall);
+                i += 1;
+            }
+
+            // Update projection list
+            List<Integer> reduceArgList = mapReduceAgg.argList;
+            MakeReduceExpr projectionExpr = mapReduceAgg.makeReduceOutputExpr;
+            projection.add(new SimpleEntry<>(reduceArgList, projectionExpr));
+
+            if (projectionExpr != USE_INPUT_FIELD) {
+                sameAggsForBothPhases = false;
+            }
+        }
+
+        RelDataType reduceTypeToUse;
+        if (sameAggsForBothPhases) {
+            reduceTypeToUse = agg.getRowType();
+        } else {
+            reduceTypeToUse = reduceType.build();
+        }
+
+        // if the number of aggregates on MAP phase is larger then the number 
of aggregates on REDUCE phase,
+        // assume that some of MAP aggregates are not used by REDUCE phase and 
this is a bug.
+        //
+        // NOTE: In general case REDUCE phase can use more aggregates than MAP 
phase,
+        // but at the moment there is no support for such aggregates.
+        assert mapAggCalls.size() <= reduceAggCalls.size() :
+                format("The number of MAP/REDUCE aggregates is not correct. 
MAP: {}\nREDUCE: {}", mapAggCalls, reduceAggCalls);
+
+        // Apply mapping to groupSet/groupSets on REDUCE phase.
+        ImmutableBitSet groupSetOnReduce = 
Mappings.apply(fieldMappingOnReduce, agg.getGroupSet());
+        List<ImmutableBitSet> groupSetsOnReduce = agg.getGroupSets().stream()
+                .map(g -> Mappings.apply(fieldMappingOnReduce, g))
+                .collect(Collectors.toList());
+
+        IgniteRel reduce = builder.makeReduceAgg(
+                agg.getCluster(),
+                reduceInputNode,
+                groupSetOnReduce,
+                groupSetsOnReduce,
+                reduceAggCalls,
+                reduceTypeToUse
+        );
+
+        //
+        // FINAL PROJECTION
+        //
+        // if aggregate MAP phase uses the same aggregates as REDUCE phase,
+        // there is no need to add a projection because no additional actions 
are required to compute final results.
+        if (sameAggsForBothPhases) {
+            return reduce;
+        }
+
+        List<RexNode> projectionList = new ArrayList<>(projection.size() + 
groupByColumns);
+
+        // Projection list returned by AggregateNode consists of columns from 
GROUP BY clause
+        // and expressions that represent aggregate calls.
+        // In case of MAP/REDUCE those expressions should compute final 
results for each MAP/REDUCE aggregate.
+
+        int i = 0;
+        for (; i < groupByColumns; i++) {
+            List<RelDataTypeField> outputRowFields = 
agg.getRowType().getFieldList();
+            RelDataType type = outputRowFields.get(i).getType();
+            RexInputRef ref = new RexInputRef(i, type);
+            projectionList.add(ref);
+        }
+
+        for (Map.Entry<List<Integer>, MakeReduceExpr> expr : projection) {
+            RexNode resultExpr = expr.getValue().makeExpr(rexBuilder, reduce, 
expr.getKey(), typeFactory);
+            projectionList.add(resultExpr);
+        }
+
+        assert projectionList.size() == agg.getRowType().getFieldList().size() 
:
+                format("Projection size does not match. Expected: {} but got 
{}",
+                        agg.getRowType().getFieldList().size(), 
projectionList.size());
+
+        for (i = 0;  i < projectionList.size(); i++) {
+            RexNode resultExpr = projectionList.get(i);
+            List<RelDataTypeField> outputRowFields = 
agg.getRowType().getFieldList();
+
+            // Put assertion here so we can see an expression that caused a 
type mismatch,
+            // since Project::isValid only shows types.
+            assert 
resultExpr.getType().equals(outputRowFields.get(i).getType()) :
+                    format("Type at position#{} does not match. Expected: {} 
but got {}.\nREDUCE aggregates: {}\nRow: {}.\nExpr: {}",
+                            i, resultExpr.getType(), 
outputRowFields.get(i).getType(), reduceAggCalls, outputRowFields, resultExpr);
+
+        }
+
+        return new IgniteProject(agg.getCluster(), reduce.getTraitSet(), 
reduce, projectionList, agg.getRowType());
+    }
+
     /**
      * Creates a MAP/REDUCE details for this call.
      */
@@ -671,4 +932,5 @@ public class MapReduceAggregates {
                 reduceOutputExpr
         );
     }
+
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateExchangeTransposeRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateExchangeTransposeRule.java
new file mode 100644
index 0000000000..f9ee9ef9e3
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateExchangeTransposeRule.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import static 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.canBeImplementedAsMapReduce;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
+import static 
org.apache.ignite.internal.sql.engine.trait.TraitUtils.distribution;
+import static 
org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
+import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.HintUtils;
+import org.immutables.value.Value;
+
+/**
+ * A rule that pushes {@link IgniteColocatedSortAggregate} node under {@link 
IgniteExchange} if possible, otherwise, splits into map-reduce
+ * phases.
+ */
[email protected]
+public class SortAggregateExchangeTransposeRule extends 
RelRule<SortAggregateExchangeTransposeRule.Config> {
+    public static final RelOptRule INSTANCE = 
SortAggregateExchangeTransposeRule.Config.INSTANCE.toRule();
+
+    
SortAggregateExchangeTransposeRule(SortAggregateExchangeTransposeRule.Config 
cfg) {
+        super(cfg);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        IgniteColocatedSortAggregate aggregate = call.rel(0);
+        IgniteExchange exchange = call.rel(1);
+
+        // Conditions already checked by SortAggregateConverterRule
+        assert !HintUtils.isExpandDistinctAggregate(aggregate);
+        assert aggregate.getGroupSets().size() == 1;
+
+        return hashAlike(distribution(exchange.getInput()))
+                && exchange.distribution() == single()
+                && (canBePushedDown(aggregate, exchange) || 
canBeConvertedToMapReduce(aggregate));
+    }
+
+    /**
+     * Returns {@code true} if an aggregate groups by distribution columns and 
therefore, can be pushed down under Exchange as-is,
+     * {@code false} otherwise.
+     */
+    private static boolean canBePushedDown(IgniteColocatedSortAggregate 
aggregate, IgniteExchange exchange) {
+        return distribution(exchange.getInput()).getType() == 
Type.HASH_DISTRIBUTED
+                && !nullOrEmpty(aggregate.getGroupSet())
+                && 
ImmutableBitSet.of(distribution(exchange.getInput()).getKeys()).equals(aggregate.getGroupSet());
+    }
+
+    /**
+     * Returns {@code true} if an aggregate can be converted to map-reduce 
phases, {@code false} otherwise.
+     */
+    private static boolean 
canBeConvertedToMapReduce(IgniteColocatedSortAggregate aggregate) {
+        return canBeImplementedAsMapReduce(aggregate.getAggCallList())
+                && !complexDistinctAgg(aggregate.getAggCallList());
+    }
+
+    private static boolean hashAlike(IgniteDistribution distribution) {
+        return distribution.getType() == Type.HASH_DISTRIBUTED
+                || distribution.getType() == Type.RANDOM_DISTRIBUTED;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        IgniteColocatedSortAggregate agg = call.rel(0);
+        IgniteExchange exchange = call.rel(1);
+
+        RelOptCluster cluster = agg.getCluster();
+        RelCollation collation = agg.collation();
+
+        if (canBePushedDown(agg, exchange)) {
+            assert agg.getGroupSets().size() == 1;
+            assert !agg.collation().getKeys().isEmpty();
+
+            IgniteExchange relNode = new IgniteExchange(
+                    cluster,
+                    exchange.getTraitSet().replace(collation),
+                    new IgniteColocatedSortAggregate(
+                            cluster,
+                            agg.getTraitSet()
+                                    .replace(collation)
+                                    
.replace(distribution(exchange.getInput())),
+                            exchange.getInput(),
+                            agg.getGroupSet(),
+                            agg.getGroupSets(),
+                            agg.getAggCallList()
+                    ),
+                    exchange.distribution()
+            );
+
+            call.transformTo(relNode);
+
+            return;
+        }
+
+        // Create mapping to adjust fields on REDUCE phase.
+        Mapping fieldMappingOnReduce = 
Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet());
+
+        // Adjust columns in output collation.
+        RelCollation outputCollation = collation.apply(fieldMappingOnReduce);
+
+        RelTraitSet inTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
+        RelTraitSet outTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(outputCollation);
+
+        AggregateRelBuilder relBuilder = new AggregateRelBuilder() {
+            @Override
+            public IgniteRel makeMapAgg(RelOptCluster cluster, RelNode input, 
ImmutableBitSet groupSet,
+                    List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggregateCalls) {
+
+                return new IgniteMapSortAggregate(
+                        cluster,
+                        outTraits.replace(distribution(input)),
+                        convert(input, inTraits.replace(distribution(input))),
+                        groupSet,
+                        groupSets,
+                        aggregateCalls,
+                        collation
+                );
+            }
+
+            @Override
+            public IgniteRel makeProject(RelOptCluster cluster, RelNode input, 
List<RexNode> reduceInputExprs,
+                    RelDataType projectRowType) {
+                return new IgniteProject(agg.getCluster(),
+                        input.getTraitSet(),
+                        input,
+                        reduceInputExprs,
+                        projectRowType
+                );
+            }
+
+            @Override
+            public IgniteRel makeReduceAgg(RelOptCluster cluster, RelNode 
input, ImmutableBitSet groupSet,
+                    List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggregateCalls, RelDataType outputType) {
+                assert distribution(input) == single();
+
+                return new IgniteReduceSortAggregate(
+                        cluster,
+                        outTraits.replace(single()),
+                        convert(input, outTraits.replace(single())),
+                        groupSet,
+                        groupSets,
+                        aggregateCalls,
+                        outputType,
+                        collation
+                );
+            }
+        };
+
+        call.transformTo(MapReduceAggregates.buildAggregates(agg, exchange, 
relBuilder, fieldMappingOnReduce));
+    }
+
+    /** Configuration. */
+    @SuppressWarnings({"ClassNameSameAsAncestorName", 
"InnerClassFieldHidesOuterClassField"})
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+        SortAggregateExchangeTransposeRule.Config INSTANCE = 
ImmutableSortAggregateExchangeTransposeRule.Config.of()
+                .withDescription("SortAggregateExchangeTransposeRule")
+                .withOperandSupplier(o0 ->
+                        o0.operand(IgniteColocatedSortAggregate.class)
+                                .oneInput(o1 ->
+                                        o1.operand(IgniteExchange.class)
+                                                .anyInputs()))
+                .as(SortAggregateExchangeTransposeRule.Config.class);
+
+        /** {@inheritDoc} */
+        @Override
+        default SortAggregateExchangeTransposeRule toRule() {
+            return new SortAggregateExchangeTransposeRule(this);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
index 1f5770097b..1d9da87526 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
@@ -25,10 +25,10 @@ import java.util.List;
 import java.util.StringJoiner;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.ignite.internal.sql.engine.hint.IgniteHint;
 
 /**
@@ -44,7 +44,7 @@ public class HintUtils {
      *
      * @param rel Logical aggregate to check on expand distinct aggregate hint.
      */
-    public static boolean isExpandDistinctAggregate(LogicalAggregate rel) {
+    public static boolean isExpandDistinctAggregate(Aggregate rel) {
         return rel.getHints().stream()
                 .anyMatch(r -> r.hintName.equals(EXPAND_DISTINCT_AGG.name()))
                 && 
rel.getAggCallList().stream().anyMatch(AggregateCall::isDistinct);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index ded0e04155..b5511c2068 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -315,30 +315,19 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
                         ))
                 ));
 
-        assertPlan(TestCase.CASE_17A,
-                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(input(isInstanceOf(IgniteLimit.class)
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                
.and(input(isInstanceOf(IgniteSort.class)
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
-                                        ))
-                                ))
-                        ))
-                ));
-        assertPlan(TestCase.CASE_17B,
-                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(input(isInstanceOf(IgniteLimit.class)
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                
.and(input(isInstanceOf(IgniteSort.class)
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
+        Predicate<RelNode> aggregateWithLimit = 
hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                .and(input(1, isInstanceOf(IgniteColocatedHashAggregate.class)
+                        .and(input(isInstanceOf(IgniteLimit.class)
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                
.and(input(isTableScan("TEST")))
                                         ))
                                 ))
                         ))
-                ));
+                ))
+        );
+        assertPlan(TestCase.CASE_17A, aggregateWithLimit);
+        assertPlan(TestCase.CASE_17B, aggregateWithLimit);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index fec1474ed0..7f6833443e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -49,6 +49,7 @@ public class ColocatedHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     private final String[] disableRules = {
             "MapReduceHashAggregateConverterRule",
+            "SortAggregateExchangeTransposeRule",
             "MapReduceSortAggregateConverterRule",
             "ColocatedSortAggregateConverterRule"
     };
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 8953888c80..3a92ca2dbf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -49,6 +49,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     private final String[] disableRules = {
             "MapReduceHashAggregateConverterRule",
             "MapReduceSortAggregateConverterRule",
+            "SortAggregateExchangeTransposeRule",
             "ColocatedHashAggregateConverterRule"
     };
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index 644718598f..52d4333dc6 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.Test;
  */
 public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerTest {
     private final String[] disableRules = {
+            "SortAggregateExchangeTransposeRule",
             "MapReduceSortAggregateConverterRule",
             "ColocatedHashAggregateConverterRule",
             "ColocatedSortAggregateConverterRule"
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index 14d8da0739..76fca16b6c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -26,15 +26,17 @@ import java.util.function.Predicate;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
 import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -52,7 +54,6 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     private final String[] disableRules = {
             "MapReduceHashAggregateConverterRule",
             "ColocatedHashAggregateConverterRule",
-            "ColocatedSortAggregateConverterRule"
     };
 
     /**
@@ -125,10 +126,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkSimpleAggWithGroupByHash(TestCase.CASE_5B);
         checkSimpleAggWithGroupByHash(TestCase.CASE_6B);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_5C);
-        checkSimpleAggWithGroupByHash(TestCase.CASE_6C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_6C);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_5D);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5D);
     }
 
     /**
@@ -150,13 +151,13 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkDistinctAggWithGroupByHash(TestCase.CASE_7_2B);
         checkDistinctAggWithGroupByHash(TestCase.CASE_7_3B);
 
-        checkDistinctAggWithGroupByHash(TestCase.CASE_7_1C);
-        checkDistinctAggWithGroupByHash(TestCase.CASE_7_2C);
-        checkDistinctAggWithGroupByHash(TestCase.CASE_7_3C);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1C);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2C);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3C);
 
-        checkDistinctAggWithGroupByHash(TestCase.CASE_7_1D);
-        checkDistinctAggWithGroupByHash(TestCase.CASE_7_2D);
-        checkDistinctAggWithGroupByHash(TestCase.CASE_7_3D);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1D);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2D);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3D);
     }
 
     /**
@@ -175,11 +176,11 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkSimpleAggWithGroupByHash(TestCase.CASE_8_1B);
         checkSimpleAggWithGroupByHash(TestCase.CASE_8_2B);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_1C);
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_2C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2C);
 
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_1D);
-        checkSimpleAggWithGroupByHash(TestCase.CASE_8_2D);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_1D);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_8_2D);
     }
 
     /**
@@ -201,11 +202,11 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10B);
         checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11B);
 
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9C);
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_10C);
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_11C);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9C);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_10C);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_11C);
 
-        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9D);
+        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9D);
     }
 
     /**
@@ -231,8 +232,8 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13A);
         checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13B);
 
-        checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13C);
-        checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13D);
+        checkColocatedGroupWithNoAggregateUseIndexHash(TestCase.CASE_13C);
+        checkColocatedGroupWithNoAggregateUseIndexHash(TestCase.CASE_13D);
     }
 
     /**
@@ -263,36 +264,22 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         String[] additionalRulesToDisable = {"NestedLoopJoinConverter", 
"CorrelatedNestedLoopJoin", "CorrelateToNestedLoopRule"};
 
         assertPlan(TestCase.CASE_16,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(input(isIndexScan("TEST", "idx_val0")))
-                        ))
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(input(isIndexScan("TEST", "idx_val0")))
                 ),
                 ArrayUtils.concat(disableRules, additionalRulesToDisable)
         );
 
-        assertPlan(TestCase.CASE_16A,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(hasDistribution(single()))
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        .and(input(isIndexScan("TEST", 
"idx_val0")))
-                                ))
-                        ))
-                ),
-                ArrayUtils.concat(disableRules, additionalRulesToDisable)
-        );
-        assertPlan(TestCase.CASE_16B,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(hasDistribution(single()))
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        .and(input(isIndexScan("TEST", 
"idx_val0")))
-                                ))
+        Predicate<RelNode> aggregateOverIndexScan = 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
+                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                .and(input(isIndexScan("TEST", "idx_val0")))
                         ))
-                ),
-                ArrayUtils.concat(disableRules, additionalRulesToDisable)
+                ))
         );
+        assertPlan(TestCase.CASE_16A, aggregateOverIndexScan, 
ArrayUtils.concat(disableRules, additionalRulesToDisable));
+        assertPlan(TestCase.CASE_16B, aggregateOverIndexScan, 
ArrayUtils.concat(disableRules, additionalRulesToDisable));
     }
 
     /**
@@ -302,12 +289,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     public void emptyCollationPassThroughLimit() throws Exception {
         assertPlan(TestCase.CASE_17,
                 hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteReduceSortAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteLimit.class)
-                                                
.and(input(isInstanceOf(IgniteSort.class)
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
+                        .and(input(1, 
isInstanceOf(IgniteColocatedSortAggregate.class)
+                                .and(input(isInstanceOf(IgniteLimit.class)
+                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                
.and(input(isTableScan("TEST")))
                                         ))
                                 ))
                         ))
@@ -315,40 +300,21 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 disableRules
         );
 
-        assertPlan(TestCase.CASE_17A,
-                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteReduceSortAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteLimit.class)
-                                                
.and(input(isInstanceOf(IgniteExchange.class)
-                                                        
.and(hasDistribution(single()))
-                                                        
.and(input(isInstanceOf(IgniteSort.class)
-                                                                
.and(input(isTableScan("TEST")))
-                                                        ))
-                                                ))
-                                        ))
-                                ))
-                        ))
-                ),
-                disableRules
-        );
-        assertPlan(TestCase.CASE_17B,
-                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
-                        .and(input(1, 
isInstanceOf(IgniteReduceSortAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteLimit.class)
-                                                
.and(input(isInstanceOf(IgniteExchange.class)
-                                                        
.and(hasDistribution(single()))
-                                                        
.and(input(isInstanceOf(IgniteSort.class)
-                                                                
.and(input(isTableScan("TEST")))
-                                                        ))
-                                                ))
+        Predicate<RelNode> aggregateWithLimit = 
hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                .and(input(1, isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(input(isInstanceOf(IgniteLimit.class)
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        .and(hasDistribution(single()))
+                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                
.and(input(isTableScan("TEST")))
                                         ))
                                 ))
                         ))
-                ),
-                disableRules
+                ))
         );
+
+        assertPlan(TestCase.CASE_17A, aggregateWithLimit, disableRules);
+        assertPlan(TestCase.CASE_17B, aggregateWithLimit, disableRules);
     }
 
     /**
@@ -400,21 +366,15 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void expandDistinctAggregates() throws Exception {
-        Predicate<? extends RelNode> subtreePredicate = 
isInstanceOf(IgniteReduceSortAggregate.class)
+        Predicate<? extends RelNode> subtreePredicate = 
isInstanceOf(IgniteColocatedSortAggregate.class)
                 // Check the second aggregation step contains accumulators.
                 // Plan must not contain distinct accumulators.
                 .and(hasAggregate())
                 .and(not(hasDistinctAggregate()))
-                .and(input(isInstanceOf(IgniteMapSortAggregate.class)
                         // Check the first aggregation step is SELECT DISTINCT 
(doesn't contain any accumulators)
-                        
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
+                        
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
                                 .and(not(hasAggregate()))
                                 .and(hasGroups())
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                ))
-                        ))
                 ));
 
         assertPlan(TestCase.CASE_21, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
@@ -422,25 +382,25 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 .and(input(1, subtreePredicate))
         ), disableRules);
 
-        subtreePredicate = isInstanceOf(IgniteReduceSortAggregate.class)
+        subtreePredicate = 
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
                 // Check the second aggregation step contains accumulators.
                 // Plan must not contain distinct accumulators.
                 .and(hasAggregate())
                 .and(not(hasDistinctAggregate()))
-                .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                        // Check the first aggregation step is SELECT DISTINCT 
(doesn't contain any accumulators)
-                        
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                .and(not(hasAggregate()))
-                                .and(hasGroups())
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        .and(hasDistribution(single()))
-                                        
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                                .and(not(hasAggregate()))
-                                                .and(hasGroups())
-                                        ))
+                // Check the first aggregation step is SELECT DISTINCT 
(doesn't contain any accumulators)
+                .and(input(isInstanceOf(IgniteReduceSortAggregate.class)
+                        .and(not(hasAggregate()))
+                        .and(hasGroups())
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
+                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                        .and(not(hasAggregate()))
+                                        .and(hasGroups())
+                                        
.and(input(isInstanceOf(IgniteIndexScan.class)))
                                 ))
                         ))
-                ));
+                ))
+        );
 
         assertPlan(TestCase.CASE_21A, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
                 .and(input(0, subtreePredicate))
@@ -473,10 +433,17 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         ))
                 ));
 
+        Predicate<IgniteExchange> colocated = 
isInstanceOf(IgniteExchange.class)
+                .and(hasDistribution(single()))
+                .and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(in -> 
hasAggregates(countMap).test(in.getAggCallList()))
+                        .and(input(isInstanceOf(IgniteSort.class)))
+                ));
+
         assertPlan(TestCase.CASE_22, nonColocated, disableRules);
         assertPlan(TestCase.CASE_22A, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_22B, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_22C, nonColocated, disableRules);
+        assertPlan(TestCase.CASE_22B, colocated, disableRules);
+        assertPlan(TestCase.CASE_22C, colocated, disableRules);
     }
 
     /**
@@ -485,6 +452,9 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void twoPhaseAvgAgg() throws Exception {
+        Predicate<AggregateCall> avgColocated = (a) ->
+                Objects.equals(a.getAggregation().getName(), "AVG") && 
a.getArgList().equals(List.of(1));
+
         Predicate<AggregateCall> sumMap = (a) ->
                 Objects.equals(a.getAggregation().getName(), "SUM") && 
a.getArgList().equals(List.of(1));
 
@@ -499,19 +469,26 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
         Predicate<RelNode> nonColocated = 
hasChildThat(isInstanceOf(IgniteReduceSortAggregate.class)
                 .and(in -> hasAggregates(sumReduce, 
sum0Reduce).test(in.getAggregateCalls()))
-                .and(input(isInstanceOf(IgniteProject.class)
-                        .and(input(isInstanceOf(IgniteExchange.class)
+                .and(input(isInstanceOf(IgniteExchange.class)
                         .and(hasDistribution(single()))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                        .and(input(isInstanceOf(IgniteProject.class)
+                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(in -> hasAggregates(sumMap, 
countMap).test(in.getAggCallList()))
-                                )
+                                ))
                         ))
-                ))));
+                )));
+
+        Predicate<RelNode> colocated = 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                .and(hasDistribution(single()))
+                .and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(in -> 
hasAggregates(avgColocated).test(in.getAggCallList()))
+                        .and(input(isInstanceOf(IgniteSort.class)))
+                )));
 
         assertPlan(TestCase.CASE_23, nonColocated, disableRules);
         assertPlan(TestCase.CASE_23A, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_23B, nonColocated, disableRules);
-        assertPlan(TestCase.CASE_23C, nonColocated, disableRules);
+        assertPlan(TestCase.CASE_23B, colocated, disableRules);
+        assertPlan(TestCase.CASE_23C, colocated, disableRules);
     }
 
     /**
@@ -519,23 +496,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void countDistinctGroupSetSingle() throws Exception {
-        Predicate<IgniteReduceSortAggregate> inputAgg = 
isInstanceOf(IgniteReduceSortAggregate.class)
-                .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
-                .and(hasCollation(RelCollations.of(0)))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                        .and(hasCollation(RelCollations.of(1)))
-                        .and(hasGroupSets(Aggregate::getGroupSets, 1))
-                ));
-
-        assertPlan(TestCase.CASE_24_1, 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        
.and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+        assertPlan(TestCase.CASE_24_1, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
                         .and(hasCollation(RelCollations.EMPTY))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
-                                .and(hasCollation(RelCollations.EMPTY))
-                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
-                                ))
-                        )),
+                        .and(input(isInstanceOf(IgniteTableScan.class)))),
                 disableRules);
     }
 
@@ -572,14 +536,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     }
 
     private void checkSimpleAggSingle(TestCase testCase) throws Exception {
-        assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(hasAggregate())
-                                .and(hasGroups())
-                                .and(input(isTableScan("TEST")))
-                        ))
-                ),
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(hasAggregate())
+                        .and(hasGroups())
+                        .and(input(isTableScan("TEST")))),
                 disableRules
         );
     }
@@ -602,13 +562,11 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     private void checkSimpleAggWithGroupBySingle(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(hasAggregate())
-                                .and(hasGroups())
-                                .and(input(isInstanceOf(IgniteSort.class)
-                                        .and(input(isTableScan("TEST")))
-                                ))
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(hasAggregate())
+                        .and(hasGroups())
+                        .and(input(isInstanceOf(IgniteSort.class)
+                                .and(input(isTableScan("TEST")))
                         ))
                 ),
                 disableRules
@@ -633,23 +591,15 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
-    private void checkDistinctAggSingle(TestCase testCase) throws Exception {
+    private void checkSimpleAggWithColocatedGroupByHash(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
+                        
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
                                 .and(hasAggregate())
-                                .and(not(hasDistinctAggregate()))
-                                
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                                .and(not(hasAggregate()))
-                                                
.and(input(isInstanceOf(IgniteSort.class)
-                                                        
.and(input(isTableScan("TEST")))
-                                                ))
-                                        ))
+                                .and(hasGroups())
+                                .and(input(isInstanceOf(IgniteSort.class)
+                                        .and(input(isTableScan("TEST")))
                                 ))
                         ))
                 ),
@@ -657,40 +607,53 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
+    private void checkDistinctAggSingle(TestCase testCase) throws Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and((hasDistinctAggregate()))
+                        // Without GROUP BY sort can be omitted.
+                        .and(input(isTableScan("TEST")))
+                ),
+                disableRules
+        );
+    }
+
     private void checkDistinctAggHash(TestCase testCase) throws Exception {
-        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(hasAggregate())
-                                .and(not(hasDistinctAggregate()))
-                                
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                .and(hasDistribution(single()))
-                                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                                        
.and(not(hasAggregate()))
-                                                        
.and(input(isInstanceOf(IgniteSort.class)
-                                                                
.and(input(isTableScan("TEST")))
-                                                        ))
-                                                ))
-                                        ))
-                                ))
-                        ))),
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(hasDistinctAggregate())
+                        .and(hasGroups())
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
+                                .and(input(isTableScan("TEST")))
+                        ))
+                ),
                 disableRules
         );
     }
 
     private void checkDistinctAggWithGroupBySingle(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(hasDistinctAggregate())
+                        .and(hasGroups())
+                        .and(input(isInstanceOf(IgniteSort.class)
+                                .and(input(isTableScan("TEST")))
+                        ))),
+                disableRules
+        );
+    }
+
+    private void checkDistinctAggWithGroupByHash(TestCase testCase) throws 
Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
                         .and(hasAggregate())
                         .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
+                        
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
+                                .and(not(hasAggregate()))
+                                .and(hasGroups())
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        .and(hasDistribution(single()))
                                         
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                                 .and(not(hasAggregate()))
                                                 .and(hasGroups())
@@ -700,46 +663,31 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                                         ))
                                 ))
                         ))
-
                 ),
                 disableRules
         );
     }
 
-    private void checkDistinctAggWithGroupByHash(TestCase testCase) throws 
Exception {
+    private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                        .and(not(hasAggregate()))
-                                        .and(hasGroups())
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                .and(hasDistribution(single()))
-                                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                                        
.and(not(hasAggregate()))
-                                                        .and(hasGroups())
-                                                        
.and(input(isInstanceOf(IgniteSort.class)
-                                                                
.and(input(isTableScan("TEST")))
-                                                        ))
-                                                ))
-                                        ))
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
+                        
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
+                                .and(hasDistinctAggregate())
+                                .and(hasGroups())
+                                .and(input(isInstanceOf(IgniteSort.class)
+                                        .and(input(isTableScan("TEST")))
                                 ))
-                        ))
-
-                ),
+                        ))),
                 disableRules
         );
     }
 
     private void checkAggWithGroupByIndexColumnsSingle(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(hasAggregate())
-                                .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
-                        ))
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(hasAggregate())
+                        .and(input(isIndexScan("TEST", "idx_grp0_grp1")))
                 ),
                 disableRules
         );
@@ -760,17 +708,27 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
+    private void checkAggWithColocatedGroupByIndexColumnsHash(TestCase 
testCase) throws Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
+                        .and(input(isInstanceOf(IgniteProject.class)
+                                
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
+                                        .and(hasAggregate())
+                                        .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
+                                ))
+                        ))),
+                disableRules
+        );
+    }
+
     private void checkGroupWithNoAggregateSingle(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
                         .and(not(hasAggregate()))
                         .and(hasGroups())
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(not(hasAggregate()))
-                                .and(hasGroups())
-                                .and(input(isInstanceOf(IgniteSort.class)
-                                        .and(input(isTableScan("TEST")))
-                                ))
+                        .and(input(isInstanceOf(IgniteSort.class)
+                                .and(input(isTableScan("TEST")))
                         ))
                 ),
                 disableRules
@@ -799,14 +757,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     private void checkGroupWithNoAggregateUseIndexSingle(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
                         .and(not(hasAggregate()))
                         .and(hasGroups())
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(not(hasAggregate()))
-                                .and(hasGroups())
-                                .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
-                        ))
+                        .and(input(isIndexScan("TEST", "idx_grp0_grp1")))
                 ),
                 disableRules
         );
@@ -830,14 +784,26 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
+    private void checkColocatedGroupWithNoAggregateUseIndexHash(TestCase 
testCase) throws Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
+                        
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
+                                .and(not(hasAggregate()))
+                                .and(hasGroups())
+                                .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
+                        ))
+                ),
+                disableRules
+        );
+    }
+
     private void checkGroupsWithOrderBySubsetOfGroupColumnsSingle(TestCase 
testCase, RelCollation collation) throws Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(input(isInstanceOf(IgniteSort.class)
-                                        .and(s -> 
s.collation().equals(collation))
-                                        .and(input(isTableScan("TEST")))
-                                ))
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(input(isInstanceOf(IgniteSort.class)
+                                .and(s -> s.collation().equals(collation))
+                                .and(input(isTableScan("TEST")))
                         ))),
                 disableRules
         );
@@ -863,14 +829,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 isInstanceOf(IgniteSort.class)
                         .and(s -> 
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
-                        .and(input(isInstanceOf(IgniteProject.class)
-                                
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                                
.and(input(isInstanceOf(IgniteSort.class)
-                                                        .and(s -> 
s.collation().equals(collation))
-                                                        
.and(input(isTableScan("TEST")
-                                                        ))
-                                                ))
+                        
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
+                                .and(input(isInstanceOf(IgniteSort.class)
+                                        .and(s -> 
s.collation().equals(collation))
+                                        .and(input(isTableScan("TEST")
                                         ))
                                 ))
                         )),
@@ -901,41 +863,27 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     }
 
     private void checkCountDistinctHash(TestCase testCase) throws Exception {
-        Predicate<IgniteReduceSortAggregate> inputAgg = 
isInstanceOf(IgniteReduceSortAggregate.class)
-                .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
-                .and(hasCollation(RelCollations.of(0)))
-                .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(single()))
-                        .and(hasCollation(RelCollations.of(0)))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                .and(hasGroupSets(Aggregate::getGroupSets, 1))
-                        ))
-                ));
-
-        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        
.and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+        // Aggregate with single group doesn't require an additional sort.
+        // Map-reduce with additional sort is much costly.
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
                         .and(hasCollation(RelCollations.EMPTY))
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
-                                .and(hasCollation(RelCollations.EMPTY))
-                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
-                                ))
-                        )),
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
+                                .and(input(isInstanceOf(TableScan.class)))
+                        ))
+                ),
                 disableRules);
     }
 
     private void checkDerivedCollationWithOrderByGroupColumnSingle(TestCase 
testCase) throws Exception {
         RelCollation requiredCollation = 
RelCollations.of(TraitUtils.createFieldCollation(0, 
Collation.DESC_NULLS_FIRST));
 
-        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
                 .and(hasAggregate())
-                .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                        .and(hasAggregate())
-                        .and(input(isInstanceOf(IgniteSort.class)
-                                .and(hasCollation(requiredCollation))
-                        ))
+                .and(input(isInstanceOf(IgniteSort.class)
+                        .and(hasCollation(requiredCollation))
                 ))
-
         ), disableRules);
     }
 
@@ -961,17 +909,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST),
                 TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST)
         );
-        RelCollation outputCollation = RelCollations.of(
-                TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST)
-        );
 
-        assertPlan(testCase, isInstanceOf(Project.class)
-                        .and(hasCollation(outputCollation))
-                        
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        
.and(hasChildThat(hasCollation(requiredCollation)))
-                                ))
-                        )),
+        assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(hasCollation(requiredCollation))
+                        .and(hasChildThat(hasCollation(requiredCollation))),
                 disableRules);
     }
 
@@ -980,13 +921,10 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST),
                 TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST)
         );
-        RelCollation outputCollation = RelCollations.of(
-                TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST)
-        );
 
         assertPlan(testCase,
                 isInstanceOf(IgniteProject.class)
-                        .and(hasCollation(outputCollation))
+                        .and(hasCollation(requiredCollation))
                         
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
                                 .and(input(isInstanceOf(IgniteExchange.class)
                                         .and(hasDistribution(single()))

Reply via email to