This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3cb914dd7a IGNITE-21580 Sql. Optimise query plans when using two phase 
aggregates (#3552)
3cb914dd7a is described below

commit 3cb914dd7ad16a879248c71545ed8083f3d80f32
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Apr 8 13:44:38 2024 +0300

    IGNITE-21580 Sql. Optimise query plans when using two phase aggregates 
(#3552)
---
 .../internal/sql/engine/ItAggregatesTest.java      |   2 +-
 .../engine/metadata/IgniteMdDistinctRowCount.java  |  30 ++-
 .../internal/sql/engine/rel/IgniteAggregate.java   |  22 +-
 .../internal/sql/engine/trait/TraitUtils.java      |   8 +-
 .../planner/AbstractAggregatePlannerTest.java      |  30 ---
 .../sql/engine/planner/AbstractPlannerTest.java    |  40 +++-
 .../sql/engine/planner/AggregatePlannerTest.java   | 236 ++++++++++-----------
 .../planner/ColocatedHashAggregatePlannerTest.java |  18 +-
 .../planner/MapReduceHashAggregatePlannerTest.java |  92 ++++----
 .../planner/MapReduceSortAggregatePlannerTest.java |  28 +--
 .../sql/engine/planner/TpchQueryPlannerTest.java   | 121 +++++++++++
 .../src/test/resources/mapping/correlated.test     |  15 +-
 .../resources/mapping/test_partition_pruning.test  |  10 +-
 .../src/test/resources/mapping/union.test          |  50 ++---
 .../src/test/resources/tpch/plan/q1.plan           |   7 +
 15 files changed, 429 insertions(+), 280 deletions(-)

diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index f3e12c117a..ae6b4262d6 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -374,7 +374,7 @@ public class ItAggregatesTest extends 
BaseSqlIntegrationTest {
         var sql = "select distinct name from person";
 
         assertQuery(sql)
-                
.matches(QueryChecker.matches(".*Colocated.*Aggregate.*Exchange.*"))
+                
.matches(QueryChecker.matches(".*ReduceHashAggregate.*Exchange.*MapHashAggregate.*"))
                 .returns("Igor")
                 .returns("Ilya")
                 .returns("Roma")
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
index 9530ee5059..9b9654631f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.sql.engine.metadata;
 
 import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.metadata.CyclicMetadataException;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
@@ -25,6 +28,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.NumberUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * IgniteMdDistinctRowCount.
@@ -36,6 +41,11 @@ public class IgniteMdDistinctRowCount extends 
RelMdDistinctRowCount {
             ReflectiveRelMetadataProvider.reflectiveSource(
                     BuiltInMethod.DISTINCT_ROW_COUNT.method, new 
IgniteMdDistinctRowCount());
 
+    @Override
+    public Double getDistinctRowCount(Aggregate rel, RelMetadataQuery mq, 
ImmutableBitSet groupKey, @Nullable RexNode predicate) {
+        return rel.estimateRowCount(mq);
+    }
+
     /** {@inheritDoc} */
     @Override
     public Double getDistinctRowCount(
@@ -44,14 +54,22 @@ public class IgniteMdDistinctRowCount extends 
RelMdDistinctRowCount {
             ImmutableBitSet groupKey,
             RexNode predicate
     ) {
-        if (groupKey.cardinality() == 0) {
-            return 1d;
+        RelNode best = rel.getBest();
+        if (best != null) {
+            return mq.getDistinctRowCount(best, groupKey, predicate);
         }
 
-        double rowCount = mq.getRowCount(rel);
-
-        rowCount *= 1.0 - Math.pow(.5, groupKey.cardinality());
+        Double d = null;
+        for (RelNode r2 : rel.getRels()) {
+            try {
+                Double d2 = mq.getDistinctRowCount(r2, groupKey, predicate);
+                d = NumberUtil.min(d, d2);
+            } catch (CyclicMetadataException e) {
+                // Ignore this relational expression; there will be non-cyclic 
ones
+                // in this set.
+            }
+        }
 
-        return rowCount;
+        return d;
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
index 637d2aca19..0447d3945b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
@@ -64,9 +64,7 @@ public abstract class IgniteAggregate extends Aggregate 
implements IgniteRel {
             return groupsCnt;
         }
 
-        // Estimation of the groups count is not available.
-        // Use heuristic estimation for result rows count.
-        return super.estimateRowCount(mq);
+        return guessDistinctRows(mq, groupSet.cardinality());
     }
 
     /**
@@ -78,11 +76,14 @@ public abstract class IgniteAggregate extends Aggregate 
implements IgniteRel {
 
         if (!aggCalls.isEmpty()) {
             double grps = estimateRowCount(mq);
-            double rows = input.estimateRowCount(mq);
 
             for (AggregateCall aggCall : aggCalls) {
                 if (aggCall.isDistinct()) {
-                    mem += IgniteCost.AGG_CALL_MEM_COST * rows / grps;
+                    ImmutableBitSet aggGroup = 
ImmutableBitSet.of(aggCall.getArgList());
+
+                    double distinctRows = guessDistinctRows(mq, 
aggGroup.cardinality());
+
+                    mem += IgniteCost.AGG_CALL_MEM_COST * distinctRows / grps;
                 } else {
                     mem += IgniteCost.AGG_CALL_MEM_COST;
                 }
@@ -92,6 +93,17 @@ public abstract class IgniteAggregate extends Aggregate 
implements IgniteRel {
         return mem;
     }
 
+    /** Implements heuristics estimation for distinct row count. */
+    private double guessDistinctRows(RelMetadataQuery mq, int groupSize) {
+        if (groupSize == 0) {
+            return 1;
+        } else {
+            double rowCount = mq.getRowCount(getInput());
+            rowCount *= (1.0 - Math.pow(.8, groupSize));
+            return rowCount;
+        }
+    }
+
     /**
      * ComputeSelfCostHash.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 638060a75a..644b9c4251 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -339,6 +339,10 @@ public class TraitUtils {
     public static List<RelNode> derive(Convention convention, 
TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
         assert !nullOrEmpty(inTraits);
 
+        if (inTraits.stream().flatMap(List::stream).anyMatch(traitSet -> 
traitSet.getConvention() != convention)) {
+            return List.of();
+        }
+
         RelTraitSet outTraits = rel.getCluster().traitSetOf(convention);
         Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations = 
combinations(outTraits, inTraits);
 
@@ -346,10 +350,6 @@ public class TraitUtils {
             return List.of();
         }
 
-        if (inTraits.stream().flatMap(List::stream).anyMatch(traitSet -> 
traitSet.getConvention() != convention)) {
-            return List.of();
-        }
-
         PropagationContext context = new PropagationContext(combinations)
                 .propagate(rel::deriveCollation);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
index 288e662f77..c8ac94287d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
@@ -29,10 +29,8 @@ import java.util.Objects;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
-import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.util.ImmutableBitSet;
 import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
 import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceAggregateBase;
@@ -40,7 +38,6 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
-import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterAll;
@@ -1066,31 +1063,4 @@ public abstract class AbstractAggregatePlannerTest 
extends AbstractPlannerTest {
             return true;
         };
     }
-
-    <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>> 
groupSets, int groupKey) {
-        return (node) -> {
-            List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
-            ImmutableBitSet firstGroupSet = allGroupSets.get(0);
-
-            boolean groupSetsMatch = 
allGroupSets.equals(List.of(ImmutableBitSet.of(groupKey)));
-            boolean groupSetMatches = 
firstGroupSet.equals(ImmutableBitSet.of(groupKey));
-
-            return groupSetMatches && groupSetsMatch;
-        };
-    }
-
-    <T> Predicate<T> hasNoGroupSets(Function<T, List<ImmutableBitSet>> 
groupSets) {
-        return (node) -> {
-            List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
-            List<ImmutableBitSet> emptyGroupSets = 
List.of(ImmutableBitSet.of());
-            return emptyGroupSets.equals(allGroupSets);
-        };
-    }
-
-    <T extends RelNode> Predicate<T> hasCollation(RelCollation expected) {
-        return (node) -> {
-            RelCollation collation = TraitUtils.collation(node);
-            return expected.equals(collation);
-        };
-    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 0bd79a619c..efbf99e7ae 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -54,6 +54,7 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelVisitor;
@@ -102,6 +103,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -113,8 +115,7 @@ import org.apache.ignite.internal.util.Pair;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * AbstractPlannerTest.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Base test class for planner tests.
  */
 public abstract class AbstractPlannerTest extends IgniteAbstractTest {
     protected static final String[] DISABLE_KEY_VALUE_MODIFY_RULES = {
@@ -130,11 +131,44 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
 
     protected static final int DEFAULT_ZONE_ID = 0;
 
+    private static final SqlExplainLevel DEFAULT_EXPLAIN_LEVEL = 
SqlExplainLevel.EXPPLAN_ATTRIBUTES;
+
     private static final AtomicInteger NEXT_TABLE_ID = new AtomicInteger(2001);
 
     /** Last error message. */
     String lastErrorMsg;
 
+    <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>> 
groupSets, int groupKey) {
+        return hasGroupSets(groupSets, List.of(groupKey));
+    }
+
+    <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>> 
groupSets, List<Integer> groupKeys) {
+        return (node) -> {
+            List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+            ImmutableBitSet firstGroupSet = allGroupSets.get(0);
+
+            boolean groupSetsMatch = 
allGroupSets.equals(List.of(ImmutableBitSet.of(groupKeys)));
+            boolean groupSetMatches = 
firstGroupSet.equals(ImmutableBitSet.of(groupKeys));
+
+            return groupSetMatches && groupSetsMatch;
+        };
+    }
+
+    <T> Predicate<T> hasNoGroupSets(Function<T, List<ImmutableBitSet>> 
groupSets) {
+        return (node) -> {
+            List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+            List<ImmutableBitSet> emptyGroupSets = 
List.of(ImmutableBitSet.of());
+            return emptyGroupSets.equals(allGroupSets);
+        };
+    }
+
+    <T extends RelNode> Predicate<T> hasCollation(RelCollation expected) {
+        return (node) -> {
+            RelCollation collation = TraitUtils.collation(node);
+            return expected.equals(collation);
+        };
+    }
+
     interface TestVisitor {
         void visit(RelNode node, int ordinal, RelNode parent);
     }
@@ -485,7 +519,7 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
     ) throws Exception {
         IgniteRel plan = physicalPlan(sql, schemas, hintStrategies, params, 
null, disabledRules);
 
-        String planString = RelOptUtil.dumpPlan("", plan, 
SqlExplainFormat.TEXT, SqlExplainLevel.ALL_ATTRIBUTES);
+        String planString = RelOptUtil.dumpPlan("", plan, 
SqlExplainFormat.TEXT, DEFAULT_EXPLAIN_LEVEL);
         log.info("statement: {}\n{}", sql, planString);
 
         checkSplitAndSerialization(plan, schemas);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index d2b0e8a423..c41b377775 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
+import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
@@ -118,9 +119,8 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         checkSimpleAggWithGroupByHash(TestCase.CASE_5A);
         checkSimpleAggWithGroupByHash(TestCase.CASE_5B);
 
-        // TODO replace with calls to test methods after 
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
-        assumeRun("checkSimpleAggWithGroupByHash", TestCase.CASE_6A);
-        assumeRun("checkSimpleAggWithGroupByHash", TestCase.CASE_6B);
+        checkSimpleAggWithGroupByHash(TestCase.CASE_6A);
+        checkSimpleAggWithGroupByHash(TestCase.CASE_6B);
 
         checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5C);
         checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5D);
@@ -250,22 +250,8 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
     public void distinctAggregateInWhereClause() throws Exception {
         checkGroupWithNoAggregateSingle(TestCase.CASE_15);
 
-        assertPlan(TestCase.CASE_15A,
-                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
-                        .and(not(hasAggregate()))
-                        .and(hasGroups())
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(input(isTableScan("TEST")))
-                        ))
-                ));
-        assertPlan(TestCase.CASE_15B,
-                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
-                        .and(not(hasAggregate()))
-                        .and(hasGroups())
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(input(isTableScan("TEST")))
-                        ))
-                ));
+        checkGroupWithNoAggregateHash(TestCase.CASE_15A);
+        checkGroupWithNoAggregateHash(TestCase.CASE_15B);
     }
 
     /**
@@ -367,14 +353,14 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
      */
     @Test
     public void aggregateWithOrderByGroupColumns() throws Exception {
-        checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_1, 
TraitUtils.createCollation(List.of(0, 1)));
-        checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_2, 
TraitUtils.createCollation(List.of(1, 0)));
+        checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_1, 
TraitUtils.createCollation(List.of(0)));
+        checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_2, 
TraitUtils.createCollation(List.of(1)));
 
-        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1A, 
TraitUtils.createCollation(List.of(0, 1)));
-        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2A, 
TraitUtils.createCollation(List.of(1, 0)));
+        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1A, 
TraitUtils.createCollation(List.of(0)));
+        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2A, 
TraitUtils.createCollation(List.of(1)));
 
-        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1B, 
TraitUtils.createCollation(List.of(0, 1)));
-        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2B, 
TraitUtils.createCollation(List.of(1, 0)));
+        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1B, 
TraitUtils.createCollation(List.of(0)));
+        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2B, 
TraitUtils.createCollation(List.of(1)));
     }
 
     /**
@@ -382,32 +368,10 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
      */
     @Test
     public void aggregateWithGroupBySubsetOrderByColumns() throws Exception {
-        assertPlan(TestCase.CASE_20,
-                isInstanceOf(IgniteSort.class)
-                        .and(s -> 
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
-                        
.and(nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(input(isTableScan("TEST")))
-                        ))
-        );
+        checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_20, 
TraitUtils.createCollation(List.of(0, 1, 2)));
 
-        assertPlan(TestCase.CASE_20A,
-                isInstanceOf(IgniteSort.class)
-                        .and(s -> 
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
-                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        .and(input(isTableScan("TEST")))
-                                ))
-                        ))
-        );
-        assertPlan(TestCase.CASE_20B,
-                isInstanceOf(IgniteSort.class)
-                        .and(s -> 
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
-                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        .and(input(isTableScan("TEST")))
-                                ))
-                        ))
-        );
+        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_20A, 
TraitUtils.createCollation(List.of(0, 1, 2)));
+        checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_20B, 
TraitUtils.createCollation(List.of(0, 1, 2)));
     }
 
 
@@ -470,7 +434,7 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         Predicate<AggregateCall> countReduce = (a) ->
                 Objects.equals(a.getAggregation().getName(), "$SUM0") && 
a.getArgList().equals(List.of(1));
 
-        Predicate<RelNode> nonColocatedGroupBy = 
hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class)
+        Predicate<RelNode> nonColocatedGroupBy = 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
                 .and(in -> 
hasAggregates(countReduce).test(in.getAggregateCalls()))
                 .and(input(isInstanceOf(IgniteExchange.class)
                         .and(input(isInstanceOf(IgniteMapHashAggregate.class)
@@ -497,17 +461,37 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
     /** Validate that we choose single phase AVG aggregate for AVG by default. 
*/
     @Test
     public void avgAgg() throws Exception {
-        Predicate<AggregateCall> countMap = (a) -> 
Objects.equals(a.getAggregation().getName(), "AVG") && 
a.getArgList().equals(List.of(1));
+        Predicate<AggregateCall> colocated = (a) ->
+                Objects.equals(a.getAggregation().getName(), "AVG") && 
a.getArgList().equals(List.of(1));
 
-        Predicate<IgniteColocatedHashAggregate> nonColocated = 
isInstanceOf(IgniteColocatedHashAggregate.class)
-                .and(in -> hasAggregates(countMap).test(in.getAggCallList()))
-                .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(single()))));
+        Predicate<AggregateCall> sumMap = (a) ->
+                Objects.equals(a.getAggregation().getName(), "SUM") && 
a.getArgList().equals(List.of(1));
+
+        Predicate<AggregateCall> countMap = (a) ->
+                Objects.equals(a.getAggregation().getName(), "COUNT") && 
a.getArgList().equals(List.of(1));
+
+        Predicate<AggregateCall> sumReduce = (a) ->
+                Objects.equals(a.getAggregation().getName(), "SUM") && 
a.getArgList().equals(List.of(1));
+
+        Predicate<AggregateCall> sum0Reduce = (a) ->
+                Objects.equals(a.getAggregation().getName(), "$SUM0") && 
a.getArgList().equals(List.of(2));
+
+        Predicate<RelNode> nonColocated = 
hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class)
+                .and(in -> hasAggregates(sumReduce, 
sum0Reduce).test(in.getAggregateCalls()))
+                .and(input(isInstanceOf(IgniteProject.class)
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
+                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                .and(in -> 
hasAggregates(sumMap, countMap).test(in.getAggCallList()))
+                                        )
+                                ))
+                        ))));
 
         Predicate<IgniteExchange> colocatedGroupBy = 
isInstanceOf(IgniteExchange.class)
                 .and(hasDistribution(single()))
                 .and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                        .and(in -> 
hasAggregates(countMap).test(in.getAggCallList()))
+                        .and(in -> 
hasAggregates(colocated).test(in.getAggCallList()))
+                        .and(hasGroups())
                         .and(input(isTableScan("TEST")))
                 ));
 
@@ -538,20 +522,15 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         checkCountDistinctHash(TestCase.CASE_24_1B);
         checkCountDistinctHash(TestCase.CASE_24_1D);
 
-        Predicate<RelNode> colocated = 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                .and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+        Predicate<RelNode> colocated = 
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
                 .and(input(isInstanceOf(IgniteExchange.class)
                         .and(hasDistribution(single())
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
-                                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                                
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
-                                        ))
+                                
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                        
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
                                 ))
-                        )
-                ))
-
-        );
+                        ))
+                ));
 
         assertPlan(TestCase.CASE_24_1C, colocated);
         assertPlan(TestCase.CASE_24_1E, colocated);
@@ -640,39 +619,39 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         assertPlan(testCase,
                 isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(hasGroups())
-                                .and(input(isTableScan("TEST")))
-                        ))
+                        .and(hasDistinctAggregate())
+                        .and(hasGroups())
+                        .and(input(isTableScan("TEST")))
         );
     }
 
     private void checkDistinctAggHash(TestCase testCase) throws Exception {
         assertPlan(testCase,
-                isInstanceOf(IgniteColocatedHashAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(hasAggregate())
                         .and(not(hasDistinctAggregate()))
-                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                                .and(not(hasAggregate()))
                                 .and(hasGroups())
                                 .and(input(isInstanceOf(IgniteExchange.class)
-                                        .and(input(isTableScan("TEST")))
+                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                .and(not(hasAggregate()))
+                                                
.and(input(isTableScan("TEST")))
+                                        ))
                                 ))
                         ))
-        );
+                ));
     }
 
     private void checkColocatedDistinctAggHash(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+                        .and(hasAggregate())
+                        .and(not(hasDistinctAggregate()))
                         .and(input(isInstanceOf(IgniteExchange.class)
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        .and(hasAggregate())
-                                        .and(not(hasDistinctAggregate()))
-                                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                                .and(hasGroups())
-                                                
.and(input(isTableScan("TEST")))
-                                        ))
+                                
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                        .and(hasGroups())
+                                        .and(input(isTableScan("TEST")))
                                 ))
                         ))
                 ));
@@ -688,16 +667,25 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         );
     }
 
+
     private void checkDistinctAggWithGroupByHash(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
-                        .and(hasDistinctAggregate())
+                        .and(hasAggregate())
+                        .and(not(hasDistinctAggregate()))
                         .and(hasGroups())
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(input(isTableScan("TEST")))
+                        
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                                .and(not(hasAggregate()))
+                                .and(hasGroups())
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                .and(not(hasAggregate()))
+                                                .and(hasGroups())
+                                                
.and(input(isTableScan("TEST")))
+                                        ))
+                                ))
                         ))
-                )
-        );
+                ));
     }
 
     private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase) 
throws Exception {
@@ -755,11 +743,15 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
 
     private void checkGroupWithNoAggregateHash(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
-                nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
                         .and(not(hasAggregate()))
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(input(isTableScan("TEST")))
+                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                        .and(not(hasAggregate()))
+                                        .and(hasGroups())
+                                        .and(input(isTableScan("TEST")))
+                                ))
                         ))
                 ));
     }
@@ -812,9 +804,9 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
 
     private void checkGroupsWithOrderByGroupColumnsSingle(TestCase testCase, 
RelCollation collation) throws Exception {
         assertPlan(testCase,
-                isInstanceOf(IgniteColocatedSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteSort.class)
-                                .and(s -> s.collation().equals(collation))
+                isInstanceOf(IgniteSort.class)
+                        .and(s -> s.collation().equals(collation))
+                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
                                 .and(input(isTableScan("TEST")))
                         ))
         );
@@ -822,12 +814,15 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
 
     private void checkGroupsWithOrderByGroupColumnsHash(TestCase testCase, 
RelCollation collation) throws Exception {
         assertPlan(testCase,
-                isInstanceOf(IgniteColocatedSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(hasDistribution(single()))
-                                .and(input(isInstanceOf(IgniteSort.class)
-                                        .and(s -> 
s.collation().equals(collation))
-                                        .and(input(isTableScan("TEST")))
+                isInstanceOf(IgniteSort.class)
+                        .and(s -> s.collation().equals(collation))
+                        .and(input(isInstanceOf(IgniteProject.class)
+                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                        
.and(input(isTableScan("TEST")))
+                                                ))
+                                        ))
                                 ))
                         ))
         );
@@ -881,31 +876,32 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
     }
 
     private void 
checkDerivedCollationWithOrderBySubsetOfGroupColumnsSingle(TestCase testCase) 
throws Exception {
-        RelCollation requiredCollation = RelCollations.of(
-                TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST),
-                TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST)
+        RelCollation outputCollation = RelCollations.of(
+                TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST)
         );
 
-        assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
-                .and(hasCollation(requiredCollation))
-                .and(input(isInstanceOf(IgniteSort.class)
-                        .and(hasCollation(requiredCollation))
-                )));
+        assertPlan(testCase,
+                isInstanceOf(IgniteSort.class)
+                        .and(hasCollation(outputCollation))
+                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)))
+        );
     }
 
     private void 
checkDerivedCollationWithOrderBySubsetOfGroupColumnsHash(TestCase testCase) 
throws Exception {
-        RelCollation requiredCollation = RelCollations.of(
-                TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST),
-                TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST)
+        RelCollation outputCollation = RelCollations.of(
+                TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST)
         );
 
-        assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
-                .and(hasCollation(requiredCollation))
-                .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(single()))
-                        .and(input(isInstanceOf(IgniteSort.class)
-                                .and(hasCollation(requiredCollation))
-                        ))
-                )));
+        assertPlan(testCase,
+                isInstanceOf(IgniteSort.class)
+                        .and(hasCollation(outputCollation))
+                        .and(input(isInstanceOf(IgniteProject.class)
+                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
+                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)))
+                                        ))
+                                ))
+                        )));
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index 93b63b1c0d..fec1474ed0 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -591,11 +591,9 @@ public class ColocatedHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(hasGroups())
-                                .and(input(isTableScan("TEST")))
-                        )),
+                        .and(hasDistinctAggregate())
+                        .and(hasGroups())
+                        .and(input(isTableScan("TEST"))),
                 disableRules
         );
     }
@@ -604,12 +602,10 @@ public class ColocatedHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(hasAggregate())
-                        .and(not(hasDistinctAggregate()))
-                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-                                .and(hasGroups())
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        .and(input(isTableScan("TEST")))
-                                ))
+                        .and(hasDistinctAggregate())
+                        .and(hasGroups())
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(input(isTableScan("TEST")))
                         )),
                 disableRules
         );
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index b09e421d5b..8230d06a47 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -141,22 +141,21 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         checkDistinctAggWithGroupBySingle(TestCase.CASE_7_2);
         checkDistinctAggWithGroupBySingle(TestCase.CASE_7_3);
 
-        // TODO replace with calls to test methods after 
https://issues.apache.org/jira/browse/IGNITE-20083 is fixed
-        assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_1A);
-        assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_2A);
-        assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_3A);
+        checkDistinctAggWithGroupByHash(TestCase.CASE_7_1A);
+        checkDistinctAggWithGroupByHash(TestCase.CASE_7_2A);
+        checkDistinctAggWithGroupByHash(TestCase.CASE_7_3A);
 
-        assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_1B);
-        assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_2B);
-        assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_3B);
+        checkDistinctAggWithGroupByHash(TestCase.CASE_7_1B);
+        checkDistinctAggWithGroupByHash(TestCase.CASE_7_2B);
+        checkDistinctAggWithGroupByHash(TestCase.CASE_7_3B);
 
-        assumeRun("checkDistinctAggWithColocatedGroupByHash", 
TestCase.CASE_7_1C);
-        assumeRun("checkDistinctAggWithColocatedGroupByHash", 
TestCase.CASE_7_2C);
-        assumeRun("checkDistinctAggWithColocatedGroupByHash", 
TestCase.CASE_7_3C);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1C);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2C);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3C);
 
-        assumeRun("checkDistinctAggWithColocatedGroupByHash", 
TestCase.CASE_7_1D);
-        assumeRun("checkDistinctAggWithColocatedGroupByHash", 
TestCase.CASE_7_2D);
-        assumeRun("checkDistinctAggWithColocatedGroupByHash", 
TestCase.CASE_7_3D);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1D);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2D);
+        checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3D);
     }
 
     /**
@@ -214,11 +213,11 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
     @Test
     public void distinctWithoutAggregate() throws Exception {
         checkGroupWithNoAggregateSingle(TestCase.CASE_12);
-        // TODO replace with calls to test methods after 
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12A);
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12B);
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12C);
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12D);
+
+        checkGroupWithNoAggregateHash(TestCase.CASE_12A);
+        checkGroupWithNoAggregateHash(TestCase.CASE_12B);
+        checkGroupWithNoAggregateHash(TestCase.CASE_12C);
+        checkGroupWithNoAggregateHash(TestCase.CASE_12D);
     }
 
     /**
@@ -227,11 +226,11 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
     @Test
     public void distinctWithoutAggregateUseIndex() throws Exception {
         checkGroupWithNoAggregateSingle(TestCase.CASE_13);
-        // TODO replace with calls to test methods after 
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13A);
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13B);
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13C);
-        assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13D);
+
+        checkGroupWithNoAggregateHash(TestCase.CASE_13A);
+        checkGroupWithNoAggregateHash(TestCase.CASE_13B);
+        checkGroupWithNoAggregateHash(TestCase.CASE_13C);
+        checkGroupWithNoAggregateHash(TestCase.CASE_13D);
     }
 
     /**
@@ -436,15 +435,14 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 ))
         );
 
-        // TODO Replace with uncommented code after 
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
-        assumeRun("", TestCase.CASE_21A);
-        assumeRun("", TestCase.CASE_21B);
-        /*
         assertPlan(TestCase.CASE_21A, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
                 .and(input(0, subtreePredicate))
                 .and(input(1, subtreePredicate))
         ), disableRules);
-         */
+        assertPlan(TestCase.CASE_21B, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                .and(input(0, subtreePredicate))
+                .and(input(1, subtreePredicate))
+        ), disableRules);
     }
 
     /**
@@ -708,6 +706,29 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         );
     }
 
+    private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase) 
throws Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                        .and(hasAggregate())
+                        .and(not(hasDistinctAggregate()))
+                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+                                        .and(not(hasAggregate()))
+                                        .and(hasGroups())
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                        
.and(not(hasAggregate()))
+                                                        .and(hasGroups())
+                                                        
.and(input(isTableScan("TEST")))
+                                                ))
+                                        ))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
+    }
+
     private void checkAggWithGroupByIndexColumnsSingle(TestCase testCase) 
throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
@@ -787,10 +808,8 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(s -> s.collation().equals(collation))
                         .and(input(isInstanceOf(IgniteProject.class)
                                 
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
-                                                // Why can't Map be pushed 
down to under 'exchange'.
-                                                
.and(input(isInstanceOf(IgniteExchange.class)
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
                                                         
.and(input(isTableScan("TEST")))
                                                 ))
                                         ))
@@ -878,12 +897,9 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(hasCollation(outputCollation))
                         .and(input(isInstanceOf(IgniteProject.class)
                                 
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
-                                                // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
-                                                // Why can't Map be pushed 
down to under 'exchange'.
-                                                
.and(input(isInstanceOf(IgniteExchange.class)
-                                                        
.and(hasDistribution(IgniteDistributions.single()))
-                                                ))
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                
.and(hasDistribution(IgniteDistributions.single()))
+                                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)))
                                         ))
                                 ))
                         )),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index 34925da593..14d8da0739 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -522,7 +522,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         Predicate<IgniteReduceSortAggregate> inputAgg = 
isInstanceOf(IgniteReduceSortAggregate.class)
                 .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
                 .and(hasCollation(RelCollations.of(0)))
-                .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
                         .and(hasCollation(RelCollations.of(1)))
                         .and(hasGroupSets(Aggregate::getGroupSets, 1))
                 ));
@@ -846,11 +846,9 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     private void checkGroupsWithOrderBySubsetOfGroupColumnsHash(TestCase 
testCase, RelCollation collation) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
-                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
-                                // Why can't Map be pushed down to under 
'exchange'.
-                                .and(input(isInstanceOf(IgniteExchange.class)
-                                        .and(hasDistribution(single()))
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
+                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         
.and(input(isInstanceOf(IgniteSort.class)
                                                 .and(s -> 
s.collation().equals(collation))
                                                 
.and(input(isTableScan("TEST")))
@@ -868,8 +866,6 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(input(isInstanceOf(IgniteProject.class)
                                 
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
                                         
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                                // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
-                                                // Why can't Map be pushed 
down to under 'exchange'.
                                                 
.and(input(isInstanceOf(IgniteSort.class)
                                                         .and(s -> 
s.collation().equals(collation))
                                                         
.and(input(isTableScan("TEST")
@@ -888,11 +884,9 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(s -> 
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
                         .and(input(isInstanceOf(IgniteProject.class)
                                 
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                        
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                                // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
-                                                // Why can't Map be pushed 
down to under 'exchange'.
-                                                
.and(input(isInstanceOf(IgniteExchange.class)
-                                                        
.and(hasDistribution(single()))
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
+                                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                                         
.and(input(isInstanceOf(IgniteSort.class)
                                                                 .and(s -> 
s.collation().equals(collation))
                                                                 
.and(input(isTableScan("TEST")
@@ -994,11 +988,9 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 isInstanceOf(IgniteProject.class)
                         .and(hasCollation(outputCollation))
                         
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-                                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
-                                        // Why can't Map be pushed down to 
under 'exchange'.
-                                        
.and(input(isInstanceOf(IgniteExchange.class)
-                                                .and(hasDistribution(single()))
+                                .and(input(isInstanceOf(IgniteExchange.class)
+                                        .and(hasDistribution(single()))
+                                        
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                                 
.and(input(isInstanceOf(IgniteSort.class)
                                                         
.and(hasCollation(requiredCollation))
                                                 ))
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TpchQueryPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TpchQueryPlannerTest.java
new file mode 100644
index 0000000000..6fb4329ea9
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TpchQueryPlannerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.util.Cloner;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.tpch.TpchHelper;
+import org.apache.ignite.internal.sql.engine.util.tpch.TpchTables;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests ensures a planner generates optimal plan for TPC-H queries.
+ *
+ * @see org.apache.ignite.internal.sql.engine.benchmarks.TpchParseBenchmark
+ */
+// TODO https://issues.apache.org/jira/browse/IGNITE-21986 validate other 
query plans and make test parameterized.
+public class TpchQueryPlannerTest extends AbstractPlannerTest {
+    private static TestCluster CLUSTER;
+
+    @BeforeAll
+    static void startCluster() {
+        CLUSTER = TestBuilders.cluster().nodes("N1").build();
+        CLUSTER.start();
+
+        TestNode node = CLUSTER.node("N1");
+
+        node.initSchema(TpchTables.LINEITEM.ddlScript());
+    }
+
+    @AfterAll
+    static void stopCluster() throws Exception {
+        CLUSTER.stop();
+        CLUSTER = null;
+    }
+
+    @Test
+    public void tpchTest_q1() {
+        validateQueryPlan("1");
+    }
+
+    private static void validateQueryPlan(String queryId) {
+        TestNode node = CLUSTER.node("N1");
+
+        MultiStepPlan plan = (MultiStepPlan) 
node.prepare(TpchHelper.getQuery(queryId));
+
+        String actualPlan = RelOptUtil.toString(Cloner.clone(plan.root(), 
Commons.cluster()), SqlExplainLevel.DIGEST_ATTRIBUTES);
+        String expectedPlan = getQueryPlan(queryId);
+
+        assertEquals(expectedPlan, actualPlan);
+    }
+
+    /**
+     * Loads query plan for provided TPC-H query id.
+     *
+     * @see TpchHelper#getQuery(String) for query id details.
+     */
+    public static String getQueryPlan(String queryId) {
+        // variant query ends with "v"
+        boolean variant = queryId.endsWith("v");
+        int numericId;
+
+        if (variant) {
+            String idString = queryId.substring(0, queryId.length() - 1);
+            numericId = Integer.parseInt(idString);
+        } else {
+            numericId = Integer.parseInt(queryId);
+        }
+
+        if (variant) {
+            var variantQueryFile = String.format("tpch/plan/variant_q%d.plan", 
numericId);
+            return loadFromResource(variantQueryFile);
+        } else {
+            var queryFile = String.format("tpch/plan/q%s.plan", numericId);
+            return loadFromResource(queryFile);
+        }
+    }
+
+    static String loadFromResource(String resource) {
+        try (InputStream is = 
TpchHelper.class.getClassLoader().getResourceAsStream(resource)) {
+            if (is == null) {
+                throw new IllegalArgumentException("Resource does not exist: " 
+ resource);
+            }
+            try (InputStreamReader reader = new InputStreamReader(is, 
StandardCharsets.UTF_8)) {
+                return CharStreams.toString(reader);
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException("I/O operation failed: " + 
resource, e);
+        }
+    }
+}
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test 
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 09750faf36..779cd1ac5b 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -211,7 +211,7 @@ Fragment#0 root
     Project
       CorrelatedNestedLoopJoin
         Receiver(sourceFragment=1, exchange=1, distribution=single)
-        ColocatedHashAggregate
+        ReduceHashAggregate
           Receiver(sourceFragment=2, exchange=2, distribution=single)
 
 Fragment#2 correlated
@@ -222,7 +222,8 @@ Fragment#2 correlated
   pruningMetadata: [3=[{0=$cor0.ID}]]
   tree:
     Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3, 
distribution=random)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3, 
distribution=random)
 
 Fragment#1
   targetNodes: [N0]
@@ -250,7 +251,7 @@ Fragment#0 root
           Project
             CorrelatedNestedLoopJoin
               Receiver(sourceFragment=2, exchange=2, distribution=single)
-              ColocatedHashAggregate
+              ReduceHashAggregate
                 Receiver(sourceFragment=3, exchange=3, distribution=single)
 
 Fragment#3 correlated
@@ -261,7 +262,8 @@ Fragment#3 correlated
   pruningMetadata: [4=[{0=$cor1.ID}]]
   tree:
     Sender(targetFragment=0, exchange=3, distribution=single)
-      TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
 
 Fragment#2 correlated
   targetNodes: [N0]
@@ -302,7 +304,7 @@ Fragment#0 root
           Project
             CorrelatedNestedLoopJoin
               Receiver(sourceFragment=2, exchange=2, distribution=single)
-              ColocatedHashAggregate
+              ReduceHashAggregate
                 Receiver(sourceFragment=3, exchange=3, distribution=single)
 
 Fragment#3 correlated
@@ -313,7 +315,8 @@ Fragment#3 correlated
   pruningMetadata: [4=[{0=$cor0.ID}, {0=$cor2.ID}]]
   tree:
     Sender(targetFragment=0, exchange=3, distribution=single)
-      TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
 
 Fragment#2 correlated
   targetNodes: [N0]
diff --git 
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test 
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 1451a4d63b..8fc500b503 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -122,7 +122,7 @@ Fragment#0 root
     Project
       CorrelatedNestedLoopJoin
         Receiver(sourceFragment=1, exchange=1, distribution=single)
-        ColocatedHashAggregate
+        ReduceHashAggregate
           Receiver(sourceFragment=2, exchange=2, distribution=single)
 
 Fragment#2 correlated
@@ -133,7 +133,8 @@ Fragment#2 correlated
   pruningMetadata: [3=[{0=$cor0.ID}]]
   tree:
     Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3, 
distribution=random)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3, 
distribution=random)
 
 Fragment#1
   targetNodes: [N0]
@@ -158,7 +159,7 @@ Fragment#0 root
     Project
       CorrelatedNestedLoopJoin
         Receiver(sourceFragment=1, exchange=1, distribution=single)
-        ColocatedHashAggregate
+        ReduceHashAggregate
           Receiver(sourceFragment=2, exchange=2, distribution=single)
 
 Fragment#2 correlated
@@ -169,7 +170,8 @@ Fragment#2 correlated
   pruningMetadata: [3=[{0=$cor0.ID}]]
   tree:
     Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2, 
distribution=random)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2, 
distribution=random)
 
 Fragment#1
   targetNodes: [N0]
diff --git a/modules/sql-engine/src/test/resources/mapping/union.test 
b/modules/sql-engine/src/test/resources/mapping/union.test
index 9b58561ed5..1ab8915466 100644
--- a/modules/sql-engine/src/test/resources/mapping/union.test
+++ b/modules/sql-engine/src/test/resources/mapping/union.test
@@ -4,32 +4,23 @@ SELECT /*+ 
DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
 ---
 Fragment#0 root
   executionNodes: [N1]
-  remoteFragments: [1, 2]
-  exchangeSourceNodes: {1=[N1], 2=[N1]}
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
   tree:
     ReduceHashAggregate
-      MapHashAggregate
-        UnionAll
-          Receiver(sourceFragment=1, exchange=1, distribution=single)
-          Receiver(sourceFragment=2, exchange=2, distribution=single)
-
-Fragment#2
-  targetNodes: [N1]
-  executionNodes: [N1]
-  tables: [T2_N1]
-  partitions: {N1=[0:1]}
-  tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, 
distribution=affinity[table: T2_N1, columns: [ID]])
+      Receiver(sourceFragment=1, exchange=1, distribution=single)
 
 Fragment#1
   targetNodes: [N1]
   executionNodes: [N1]
-  tables: [T1_N1]
+  tables: [T1_N1, T2_N1]
   partitions: {N1=[0:1]}
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T1_N1, source=4, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
+      MapHashAggregate
+        UnionAll
+          TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
+          TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, 
distribution=affinity[table: T2_N1, columns: [ID]])
 ---
 
 N1
@@ -62,32 +53,23 @@ SELECT /*+ 
DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
 ---
 Fragment#0 root
   executionNodes: [N1]
-  remoteFragments: [1, 2]
-  exchangeSourceNodes: {1=[N1, N2], 2=[N1, N2]}
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1, N2]}
   tree:
     ReduceHashAggregate
-      MapHashAggregate
-        UnionAll
-          Receiver(sourceFragment=1, exchange=1, distribution=single)
-          Receiver(sourceFragment=2, exchange=2, distribution=single)
-
-Fragment#2
-  targetNodes: [N1]
-  executionNodes: [N1, N2]
-  tables: [T2_N1N2]
-  partitions: {N1=[0:2], N2=[1:2]}
-  tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.T2_N1N2, source=3, partitions=2, 
distribution=affinity[table: T2_N1N2, columns: [ID]])
+      Receiver(sourceFragment=1, exchange=1, distribution=single)
 
 Fragment#1
   targetNodes: [N1]
   executionNodes: [N1, N2]
-  tables: [T1_N1N2]
+  tables: [T1_N1N2, T2_N1N2]
   partitions: {N1=[0:2], N2=[1:2]}
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T1_N1N2, source=4, partitions=2, 
distribution=affinity[table: T1_N1N2, columns: [ID]])
+      MapHashAggregate
+        UnionAll
+          TableScan(name=PUBLIC.T1_N1N2, source=2, partitions=2, 
distribution=affinity[table: T1_N1N2, columns: [ID]])
+          TableScan(name=PUBLIC.T2_N1N2, source=3, partitions=2, 
distribution=affinity[table: T2_N1N2, columns: [ID]])
 ---
 
 N1
diff --git a/modules/sql-engine/src/test/resources/tpch/plan/q1.plan 
b/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
new file mode 100644
index 0000000000..af8defab87
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
@@ -0,0 +1,7 @@
+Sort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
+  Project(inputs=[0..5], exprs=[[DECIMAL_DIVIDE($6, $7, 15, 2), 
DECIMAL_DIVIDE($8, $9, 15, 2), DECIMAL_DIVIDE($10, $11, 15, 2), 
CAST($12):BIGINT NOT NULL]])
+    ReduceHashAggregate(group=[{0, 1}], SUM_QTY=[SUM($2)], 
SUM_BASE_PRICE=[SUM($3)], SUM_DISC_PRICE=[SUM($4)], SUM_CHARGE=[SUM($5)], 
AVG_SUM6=[SUM($6)], AVG_SUM06=[$SUM0($7)], AVG_SUM8=[SUM($8)], 
AVG_SUM08=[$SUM0($9)], AVG_SUM10=[SUM($10)], AVG_SUM010=[$SUM0($11)], 
COUNT_12_MAP_SUM=[$SUM0($12)])
+      Project(inputs=[0..6], exprs=[[CAST($7):DECIMAL(32767, 0) NOT NULL, $8, 
CAST($9):DECIMAL(32767, 0) NOT NULL, $10, CAST($11):DECIMAL(32767, 0) NOT NULL, 
$12, $13]])
+        Exchange(distribution=[single])
+          MapHashAggregate(group=[{0, 1}], SUM_QTY=[SUM($2)], 
SUM_BASE_PRICE=[SUM($3)], SUM_DISC_PRICE=[SUM($4)], SUM_CHARGE=[SUM($5)], 
AVG_SUM6=[SUM($2)], AVG_COUNT6=[COUNT($2)], AVG_SUM8=[SUM($3)], 
AVG_COUNT8=[COUNT($3)], AVG_SUM10=[SUM($6)], AVG_COUNT10=[COUNT($6)], 
COUNT_ORDER=[COUNT()])
+            IndexScan(table=[[PUBLIC, LINEITEM]], index=[L_SD], type=[SORTED], 
searchBounds=[[RangeBounds [lowerBound=null, upperBound=-(1998-12-01, 
7776000000:INTERVAL DAY), lowerInclude=true, upperInclude=true]]], 
filters=[<=($t6, -(1998-12-01, 7776000000:INTERVAL DAY))], projects=[[$t4, $t5, 
$t0, $t1, *($t1, -(1, $t2)), *(*($t1, -(1, $t2)), +(1, $t3)), $t2]], 
requiredColumns=[{4, 5, 6, 7, 8, 9, 10}], collation=[[10]])

Reply via email to