This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 950a3c0428 IGNITE-20164 Sql. Incorrect propagation of RelCollation 
trait for Sort-based map/reduce aggregates (#2461)
950a3c0428 is described below

commit 950a3c0428e83bf717c5430a91470cf3aee27d58
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Sep 1 14:08:13 2023 +0300

    IGNITE-20164 Sql. Incorrect propagation of RelCollation trait for 
Sort-based map/reduce aggregates (#2461)
---
 .../internal/sql/engine/ItAggregatesTest.java      | 411 ++++++++++-----------
 .../sql/engine/rel/agg/IgniteMapSortAggregate.java |   2 +-
 .../engine/rel/agg/IgniteSortAggregateBase.java    |  33 +-
 .../sql/engine/rel/agg/MapReduceAggregates.java    |  25 +-
 .../engine/rule/HashAggregateConverterRule.java    |  20 +-
 .../engine/rule/SortAggregateConverterRule.java    |  32 +-
 .../engine/rule/logical/LogicalOrToUnionRule.java  |   2 +-
 .../sql/engine/trait/TraitsAwareIgniteRel.java     |   2 +-
 .../ignite/internal/sql/engine/util/Commons.java   |  54 +--
 .../ignite/internal/sql/engine/util/PlanUtils.java |  40 +-
 .../ignite/internal/sql/engine/util/RexUtils.java  |   6 +-
 .../exec/rel/HashAggregateExecutionTest.java       |   5 +-
 .../exec/rel/SortAggregateExecutionTest.java       |   7 +-
 .../planner/AbstractAggregatePlannerTest.java      |  72 ++++
 .../sql/engine/planner/AggregatePlannerTest.java   |  55 +++
 .../planner/ColocatedHashAggregatePlannerTest.java |  44 +++
 .../planner/ColocatedSortAggregatePlannerTest.java |  33 ++
 .../engine/planner/MapReduceAggregatesTest.java    | 138 +++++++
 .../planner/MapReduceHashAggregatePlannerTest.java |  56 +++
 .../planner/MapReduceSortAggregatePlannerTest.java |  67 ++++
 .../internal/sql/engine/util/CommonsTest.java      |  26 +-
 21 files changed, 763 insertions(+), 367 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index ace179bcec..e6b5f41a95 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -57,7 +57,7 @@ public class ItAggregatesTest extends 
ClusterPerClassIntegrationTest {
      * Before all.
      */
     @BeforeAll
-    static void initTestData() throws InterruptedException {
+    static void initTestData() {
         createAndPopulateTable();
 
         sql("CREATE ZONE test_zone with replicas=2, partitions=10");
@@ -71,6 +71,25 @@ public class ItAggregatesTest extends 
ClusterPerClassIntegrationTest {
             sql("INSERT INTO test (id, grp0, grp1, val0, val1) VALUES (?, ?, 
?, ?, ?)", i, i / 10, i / 100, 1, 2);
             sql("INSERT INTO test_one_col_idx (pk, col0) VALUES (?, ?)", i, i);
         }
+
+        sql("CREATE TABLE t1_colo_val1(id INT, val0 VARCHAR, val1 VARCHAR, 
val2 VARCHAR, PRIMARY KEY(id, val1)) "
+                + "COLOCATE BY (val1)");
+
+        sql("CREATE TABLE t2_colo_va1(id INT, val0 VARCHAR, val1 VARCHAR, val2 
VARCHAR, PRIMARY KEY(id, val1)) "
+                + "COLOCATE BY (val1)");
+
+        for (int i = 0; i < 100; i++) {
+            sql("INSERT INTO t1_colo_val1 VALUES (?, ?, ?, ?)", i, "val" + i, 
"val" + i % 2, "val" + i);
+        }
+
+        sql("INSERT INTO t2_colo_va1 VALUES (0, 'val0', 'val0', 'val0'), (1, 
'val1', 'val1', 'val1')");
+
+        sql("CREATE TABLE test_a_b_s (id INTEGER PRIMARY KEY, a INTEGER, b 
INTEGER, s VARCHAR);");
+        sql("INSERT INTO test_a_b_s VALUES (1, 11, 1, 'hello'), (2, 12, 2, 
'world'), (3, 11, 3, NULL)");
+        sql("INSERT INTO test_a_b_s VALUES (4, 11, 3, 'hello'), (5, 12, 2, 
'world'), (6, 10, 5, 'ahello'), (7, 13, 6, 'world')");
+
+        sql("CREATE TABLE test_str_int_real_dec "
+                + "(id INTEGER PRIMARY KEY, str_col VARCHAR, int_col INTEGER,  
real_col REAL, dec_col DECIMAL)");
     }
 
     @ParameterizedTest
@@ -267,94 +286,63 @@ public class ItAggregatesTest extends 
ClusterPerClassIntegrationTest {
 
     @Test
     public void testColocatedAggregate() {
-        try {
-            sql("CREATE TABLE t1(id INT, val0 VARCHAR, val1 VARCHAR, val2 
VARCHAR, PRIMARY KEY(id, val1)) "
-                    + "COLOCATE BY (val1)");
-
-            sql("CREATE TABLE t2(id INT, val0 VARCHAR, val1 VARCHAR, val2 
VARCHAR, PRIMARY KEY(id, val1)) "
-                    + "COLOCATE BY (val1)");
+        String sql = "SELECT val1, count(val2) FROM t1_colo_val1 GROUP BY 
val1";
 
-            for (int i = 0; i < 100; i++) {
-                sql("INSERT INTO t1 VALUES (?, ?, ?, ?)", i, "val" + i, "val" 
+ i % 2, "val" + i);
-            }
-
-            sql("INSERT INTO t2 VALUES (0, 'val0', 'val0', 'val0'), (1, 
'val1', 'val1', 'val1')");
-
-            String sql = "SELECT val1, count(val2) FROM t1 GROUP BY val1";
-
-            assertQuery(sql)
-                    
.matches(QueryChecker.matches(".*Exchange.*Colocated.*Aggregate.*"))
-                    .returns("val0", 50L)
-                    .returns("val1", 50L)
-                    .check();
+        assertQuery(sql)
+                
.matches(QueryChecker.matches(".*Exchange.*Colocated.*Aggregate.*"))
+                .returns("val0", 50L)
+                .returns("val1", 50L)
+                .check();
 
-            sql = "SELECT t2.val1, agg.cnt "
-                    + "FROM t2 JOIN (SELECT val1, COUNT(val2) AS cnt FROM t1 
GROUP BY val1) AS agg ON t2.val1 = agg.val1";
+        sql = "SELECT t2_colo_va1.val1, agg.cnt "
+                + "FROM t2_colo_va1 JOIN (SELECT val1, COUNT(val2) AS cnt FROM 
t1_colo_val1 GROUP BY val1) "
+                + "AS agg ON t2_colo_va1.val1 = agg.val1";
 
-            assertQuery(sql)
-                    
.matches(QueryChecker.matches(".*Exchange.*Join.*Colocated.*Aggregate.*"))
-                    .returns("val0", 50L)
-                    .returns("val1", 50L)
-                    .check();
-        } finally {
-            sql("DROP TABLE IF EXISTS t1");
-            sql("DROP TABLE IF EXISTS t2");
-        }
+        assertQuery(sql)
+                
.matches(QueryChecker.matches(".*Exchange.*Join.*Colocated.*Aggregate.*"))
+                .returns("val0", 50L)
+                .returns("val1", 50L)
+                .check();
     }
 
     @ParameterizedTest
     @MethodSource("provideRules")
     public void testColocatedAggregate(String[] rules) {
-        try {
-            sql("CREATE TABLE t1(id INT, val0 VARCHAR, val1 VARCHAR, val2 
VARCHAR, PRIMARY KEY(id, val1)) "
-                    + "COLOCATE BY (val1)");
-
-            sql("CREATE TABLE t2(id INT, val0 VARCHAR, val1 VARCHAR, val2 
VARCHAR, PRIMARY KEY(id, val1)) "
-                    + "COLOCATE BY (val1)");
-
-            for (int i = 0; i < 100; i++) {
-                sql("INSERT INTO t1 VALUES (?, ?, ?, ?)", i, "val" + i, "val" 
+ i % 2, "val" + i);
-            }
-
-            sql("INSERT INTO t2 VALUES (0, 'val0', 'val0', 'val0'), (1, 
'val1', 'val1', 'val1')");
+        String sql = "SELECT val1, count(val2) FROM t1_colo_val1 GROUP BY 
val1";
 
-            String sql = "SELECT val1, count(val2) FROM t1 GROUP BY val1";
-
-            assertQuery(sql)
-                    .disableRules(rules)
-                    .returns("val0", 50L)
-                    .returns("val1", 50L)
-                    .check();
+        assertQuery(sql)
+                .disableRules(rules)
+                .returns("val0", 50L)
+                .returns("val1", 50L)
+                .check();
 
-            sql = "SELECT t2.val1, agg.cnt "
-                    + "FROM t2 JOIN (SELECT val1, COUNT(val2) AS cnt FROM t1 
GROUP BY val1) AS agg ON t2.val1 = agg.val1";
+        sql = "SELECT t2_colo_va1.val1, agg.cnt "
+                + "FROM t2_colo_va1 JOIN (SELECT val1, COUNT(val2) AS cnt FROM 
t1_colo_val1 GROUP BY val1) "
+                + "AS agg ON t2_colo_va1.val1 = agg.val1";
 
-            assertQuery(sql)
-                    .disableRules(rules)
-                    .returns("val0", 50L)
-                    .returns("val1", 50L)
-                    .check();
-        } finally {
-            sql("DROP TABLE IF EXISTS t1");
-            sql("DROP TABLE IF EXISTS t2");
-        }
+        assertQuery(sql)
+                .disableRules(rules)
+                .returns("val0", 50L)
+                .returns("val1", 50L)
+                .check();
     }
 
     @Test
     public void testEverySomeAggregate() {
-        sql("CREATE TABLE t(c0 INT PRIMARY KEY, c1 INT, c2 INT)");
-        sql("INSERT INTO t VALUES (1, null, 0)");
-        sql("INSERT INTO t VALUES (2, 0, null)");
-        sql("INSERT INTO t VALUES (3, null, null)");
-        sql("INSERT INTO t VALUES (4, 0, 1)");
-        sql("INSERT INTO t VALUES (5, 1, 1)");
-        sql("INSERT INTO t VALUES (6, 1, 2)");
-        sql("INSERT INTO t VALUES (7, 2, 2)");
-
-        assertQuery("SELECT EVERY(c1 < c2) FROM t").returns(false).check();
-        assertQuery("SELECT SOME(c1 < c2) FROM t").returns(true).check();
-        assertQuery("SELECT EVERY(c1 <= c2) FROM t").returns(true).check();
-        assertQuery("SELECT SOME(c1 > c2) FROM t").returns(false).check();
+        sql("DELETE FROM test_a_b_s");
+
+        sql("INSERT INTO test_a_b_s(id, a, b) VALUES (1, null, 0)");
+        sql("INSERT INTO test_a_b_s(id, a, b) VALUES (2, 0, null)");
+        sql("INSERT INTO test_a_b_s(id, a, b) VALUES (3, null, null)");
+        sql("INSERT INTO test_a_b_s(id, a, b) VALUES (4, 0, 1)");
+        sql("INSERT INTO test_a_b_s(id, a, b) VALUES (5, 1, 1)");
+        sql("INSERT INTO test_a_b_s(id, a, b) VALUES (6, 1, 2)");
+        sql("INSERT INTO test_a_b_s(id, a, b) VALUES (7, 2, 2)");
+
+        assertQuery("SELECT EVERY(a < b) FROM 
test_a_b_s").returns(false).check();
+        assertQuery("SELECT SOME(a < b) FROM 
test_a_b_s").returns(true).check();
+        assertQuery("SELECT EVERY(a <= b) FROM 
test_a_b_s").returns(true).check();
+        assertQuery("SELECT SOME(a > b) FROM 
test_a_b_s").returns(false).check();
     }
 
     @Test
@@ -390,199 +378,188 @@ public class ItAggregatesTest extends 
ClusterPerClassIntegrationTest {
     @MethodSource("provideRules")
     @WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
     public void testDifferentAgg(String[] rules) {
-        try {
-            sql("CREATE TABLE testMe (a INTEGER, b INTEGER, s VARCHAR);");
-            sql("INSERT INTO testMe VALUES (11, 1, 'hello'), (12, 2, 'world'), 
(11, 3, NULL)");
-            sql("INSERT INTO testMe VALUES (11, 3, 'hello'), (12, 2, 'world'), 
(10, 5, 'ahello'), (13, 6, 'world')");
-
-            assertQuery("SELECT DISTINCT(a) as a FROM testMe ORDER BY a")
-                    .disableRules(rules)
-                    .returns(10)
-                    .returns(11)
-                    .returns(12)
-                    .returns(13)
-                    .check();
-
-            assertQuery("SELECT COUNT(*) FROM testMe")
-                    .disableRules(rules)
-                    .returns(7L)
-                    .check();
+        assertQuery("SELECT DISTINCT(a) as a FROM test_a_b_s ORDER BY a")
+                .disableRules(rules)
+                .returns(10)
+                .returns(11)
+                .returns(12)
+                .returns(13)
+                .check();
 
-            // Such kind of queries can`t be processed with
-            if (Arrays.stream(rules).anyMatch(r -> 
r.contains("MapReduceSortAggregateConverterRule"))) {
-                assertQuery("SELECT COUNT(a), COUNT(DISTINCT(b)) FROM testMe")
-                        .disableRules(rules)
-                        .returns(7L, 5L)
-                        .check();
-            }
+        assertQuery("SELECT COUNT(*) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(7L)
+                .check();
 
-            assertQuery("SELECT COUNT(a) as a, s FROM testMe GROUP BY s ORDER 
BY a, s")
+        // Such kind of queries can`t be processed with
+        if (Arrays.stream(rules).anyMatch(r -> 
r.contains("MapReduceSortAggregateConverterRule"))) {
+            assertQuery("SELECT COUNT(a), COUNT(DISTINCT(b)) FROM test_a_b_s")
                     .disableRules(rules)
-                    .returns(1L, "ahello")
-                    .returns(1L, null)
-                    .returns(2L, "hello")
-                    .returns(3L, "world")
+                    .returns(7L, 5L)
                     .check();
+        }
 
-            if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
-                assertQuery("SELECT COUNT(a) as a, AVG(a) as b, MIN(a), 
MIN(b), s FROM testMe GROUP BY s ORDER BY a, b")
-                        .disableRules(rules)
-                        .returns(1L, 10, 10, 5, "ahello")
-                        .returns(1L, 11, 11, 3, null)
-                        .returns(2L, 11, 11, 1, "hello")
-                        .returns(3L, 12, 12, 2, "world")
-                        .check();
-
-                assertQuery("SELECT COUNT(a) as a, AVG(a) as bb, MIN(a), 
MIN(b), s FROM testMe GROUP BY s, b ORDER BY a, s")
-                        .disableRules(rules)
-                        .returns(1L, 10, 10, 5, "ahello")
-                        .returns(1L, 11, 11, 1, "hello")
-                        .returns(1L, 11, 11, 3, "hello")
-                        .returns(1L, 13, 13, 6, "world")
-                        .returns(1L, 11, 11, 3, null)
-                        .returns(2L, 12, 12, 2, "world")
-                        .check();
-            }
-
+        assertQuery("SELECT COUNT(a) as a, s FROM test_a_b_s GROUP BY s ORDER 
BY a, s")
+                .disableRules(rules)
+                .returns(1L, "ahello")
+                .returns(1L, null)
+                .returns(2L, "hello")
+                .returns(3L, "world")
+                .check();
 
-            assertQuery("SELECT COUNT(a) FROM testMe")
+        if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
+            assertQuery("SELECT COUNT(a) as a, AVG(a) as b, MIN(a), MIN(b), s 
FROM test_a_b_s GROUP BY s ORDER BY a, b")
                     .disableRules(rules)
-                    .returns(7L)
+                    .returns(1L, 10, 10, 5, "ahello")
+                    .returns(1L, 11, 11, 3, null)
+                    .returns(2L, 11, 11, 1, "hello")
+                    .returns(3L, 12, 12, 2, "world")
                     .check();
 
-            assertQuery("SELECT COUNT(DISTINCT(a)) FROM testMe")
+            assertQuery("SELECT COUNT(a) as a, AVG(a) as bb, MIN(a), MIN(b), s 
FROM test_a_b_s GROUP BY s, b ORDER BY a, s")
                     .disableRules(rules)
-                    .returns(4L)
+                    .returns(1L, 10, 10, 5, "ahello")
+                    .returns(1L, 11, 11, 1, "hello")
+                    .returns(1L, 11, 11, 3, "hello")
+                    .returns(1L, 13, 13, 6, "world")
+                    .returns(1L, 11, 11, 3, null)
+                    .returns(2L, 12, 12, 2, "world")
                     .check();
+        }
 
-            assertQuery("SELECT COUNT(a), COUNT(s), COUNT(*) FROM testMe")
-                    .disableRules(rules)
-                    .returns(7L, 6L, 7L)
-                    .check();
 
-            if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
-                assertQuery("SELECT AVG(a) FROM testMe")
-                        .disableRules(rules)
-                        .returns(11)
-                        .check();
-            }
+        assertQuery("SELECT COUNT(a) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(7L)
+                .check();
 
-            assertQuery("SELECT MIN(a) FROM testMe")
-                    .disableRules(rules)
-                    .returns(10)
-                    .check();
+        assertQuery("SELECT COUNT(DISTINCT(a)) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(4L)
+                .check();
 
-            assertQuery("SELECT COUNT(a), COUNT(DISTINCT(a)) FROM testMe")
-                    .disableRules(rules)
-                    .returns(7L, 4L)
-                    .check();
+        assertQuery("SELECT COUNT(a), COUNT(s), COUNT(*) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(7L, 6L, 7L)
+                .check();
 
-            assertQuery("SELECT COUNT(a), COUNT(DISTINCT a), SUM(a), 
SUM(DISTINCT a) FROM testMe")
+        if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
+            assertQuery("SELECT AVG(a) FROM test_a_b_s")
                     .disableRules(rules)
-                    .returns(7L, 4L, 80L, 46L)
+                    .returns(11)
                     .check();
-        } finally {
-            sql("DROP TABLE IF EXISTS testMe");
         }
+
+        assertQuery("SELECT MIN(a) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(10)
+                .check();
+
+        assertQuery("SELECT COUNT(a), COUNT(DISTINCT(a)) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(7L, 4L)
+                .check();
+
+        assertQuery("SELECT COUNT(a), COUNT(DISTINCT a), SUM(a), SUM(DISTINCT 
a) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(7L, 4L, 80L, 46L)
+                .check();
     }
 
     @ParameterizedTest
     @MethodSource("provideRules")
-    @WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
     public void checkEmptyTable(String[] rules) {
-        sql("CREATE TABLE t (a INTEGER, b INTEGER)");
+        sql("DELETE FROM test_a_b_s");
 
-        try {
-            assertQuery("SELECT min(b) FROM t GROUP BY a")
-                    .disableRules(rules)
-                    .returnNothing().check();
-        } finally {
-            sql("DROP TABLE t");
-        }
+        assertQuery("SELECT min(b) FROM test_a_b_s GROUP BY a")
+                .disableRules(rules)
+                .returnNothing().check();
     }
 
     @ParameterizedTest
     @MethodSource("rulesForGroupingSets")
     public void testGroupingSets(String[] rules) {
-        try {
-            sql("CREATE TABLE test1 (id INTEGER PRIMARY KEY, str_col VARCHAR, 
int_col INTEGER);");
-            sql("INSERT INTO test1 VALUES (1, 's1', 10)");
-            sql("INSERT INTO test1 VALUES (2, 's1', 20)");
-            sql("INSERT INTO test1 VALUES (3, 's2', 10)");
-            sql("INSERT INTO test1 VALUES (4, 's3', 40)");
-
-            assertQuery("SELECT str_col, SUM(int_col), COUNT(str_col) FROM 
test1 GROUP BY GROUPING SETS "
-                    + "( (str_col, int_col), (str_col), (int_col), () ) HAVING 
SUM(int_col) > 0")
-                    .disableRules(rules)
-                    .returns(null, 80L, 4L)
-                    .returns("s1", 10L, 1L)
-                    .returns("s3", 40L, 1L)
-                    .returns("s1", 20L, 1L)
-                    .returns("s2", 10L, 1L)
-                    .returns("s2", 10L, 1L)
-                    .returns("s3", 40L, 1L)
-                    .returns("s1", 30L, 2L)
-                    .returns(null, 40L, 1L)
-                    .returns(null, 20L, 2L)
-                    .returns(null, 20L, 1L)
-                    .check();
+        sql("DELETE FROM test_str_int_real_dec");
 
-        } finally {
-            sql("DROP TABLE test1");
-        }
+        sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(1, 's1', 10)");
+        sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(2, 's1', 20)");
+        sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(3, 's2', 10)");
+        sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(4, 's3', 40)");
+
+        assertQuery("SELECT str_col, SUM(int_col), COUNT(str_col) FROM 
test_str_int_real_dec GROUP BY GROUPING SETS "
+                + "( (str_col, int_col), (str_col), (int_col), () ) HAVING 
SUM(int_col) > 0")
+                .disableRules(rules)
+                .returns(null, 80L, 4L)
+                .returns("s1", 10L, 1L)
+                .returns("s3", 40L, 1L)
+                .returns("s1", 20L, 1L)
+                .returns("s2", 10L, 1L)
+                .returns("s2", 10L, 1L)
+                .returns("s3", 40L, 1L)
+                .returns("s1", 30L, 2L)
+                .returns(null, 40L, 1L)
+                .returns(null, 20L, 2L)
+                .returns(null, 20L, 1L)
+                .check();
     }
 
     @ParameterizedTest
     @MethodSource("rulesForGroupingSets")
     public void testDuplicateGroupingSets(String[] rules) {
-        try {
-            sql("CREATE TABLE test1 (id INTEGER PRIMARY KEY, str_col VARCHAR, 
int_col INTEGER);");
-            sql("INSERT INTO test1 VALUES (1, 's1', 10)");
-            sql("INSERT INTO test1 VALUES (2, 's1', 20)");
-            sql("INSERT INTO test1 VALUES (3, 's2', 10)");
+        sql("DELETE FROM test_str_int_real_dec");
 
-            assertQuery("SELECT str_col  FROM test1 GROUP BY GROUPING SETS 
((str_col), (), (str_col), ()) ORDER BY str_col")
-                    .disableRules(rules)
-                    .returns("s1")
-                    .returns("s2")
-                    .returns(null)
-                    .returns("s1")
-                    .returns("s2")
-                    .returns(null)
-                    .check();
-        } finally {
-            sql("DROP TABLE test1");
-        }
+        sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(1, 's1', 10)");
+        sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(2, 's1', 20)");
+        sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(3, 's2', 10)");
+
+        assertQuery("SELECT str_col  FROM test_str_int_real_dec GROUP BY 
GROUPING SETS ((str_col), (), (str_col), ()) ORDER BY str_col")
+                .disableRules(rules)
+                .returns("s1")
+                .returns("s2")
+                .returns(null)
+                .returns("s1")
+                .returns("s2")
+                .returns(null)
+                .check();
     }
 
     @ParameterizedTest
     @MethodSource("provideRules")
     public void testAvgOnEmptyGroup(String[] rules) {
+        sql("DELETE FROM test_str_int_real_dec");
+
         // TODO https://issues.apache.org/jira/browse/IGNITE-20009
         //  Remove after is fixed.
         Assumptions.assumeFalse(Arrays.stream(rules)
                 .filter(COLO_RULES::contains).count() == COLO_RULES.size(), 
"AVG is disabled for MAP/REDUCE");
 
-        try {
-            sql("CREATE TABLE test1 (id INTEGER PRIMARY KEY, str_col VARCHAR, 
int_col INTEGER, real_col REAL, dec_col DECIMAL);");
+        assertQuery("SELECT AVG(int_col) FROM test_str_int_real_dec")
+                .disableRules(rules)
+                .returns(new Object[]{null})
+                .check();
 
-            assertQuery("SELECT AVG(int_col) FROM test1")
-                    .disableRules(rules)
-                    .returns(new Object[]{null})
-                    .check();
+        assertQuery("SELECT AVG(real_col) FROM test_str_int_real_dec")
+                .disableRules(rules)
+                .returns(new Object[]{null})
+                .check();
 
-            assertQuery("SELECT AVG(real_col) FROM test1")
-                    .disableRules(rules)
-                    .returns(new Object[]{null})
-                    .check();
+        assertQuery("SELECT AVG(dec_col) FROM test_str_int_real_dec")
+                .disableRules(rules)
+                .returns(new Object[]{null})
+                .check();
+    }
 
-            assertQuery("SELECT AVG(dec_col) FROM test1")
-                    .disableRules(rules)
-                    .returns(new Object[]{null})
-                    .check();
-        } finally {
-            sql("DROP TABLE test1");
-        }
+    @ParameterizedTest
+    @MethodSource("provideRules")
+    public void testAggDistinctGroupSet(String[] rules) {
+        sql("DELETE FROM test_a_b_s");
+
+        sql("INSERT INTO test_a_b_s (id, a, b) VALUES (1, 11, 2), (2, 12, 2), 
(3, 12, 3)");
+
+        assertQuery("SELECT COUNT(a), COUNT(DISTINCT(b)) FROM test_a_b_s")
+                .disableRules(rules)
+                .returns(3L, 2L)
+                .check();
     }
 
     private static Stream<Arguments> rulesForGroupingSets() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
index d2cab69c0c..0e7ef67d2e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
@@ -90,7 +90,7 @@ public class IgniteMapSortAggregate extends 
IgniteMapAggregateBase implements Ig
             List<ImmutableBitSet> groupSets,
             List<AggregateCall> aggCalls) {
         return new IgniteMapSortAggregate(
-                getCluster(), traitSet, input, groupSet, groupSets, aggCalls, 
TraitUtils.collation(traitSet));
+                getCluster(), traitSet, input, groupSet, groupSets, aggCalls, 
this.collation);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
index eaac81c14a..d763c49b95 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
@@ -29,12 +29,12 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.sql.engine.trait.TraitsAwareIgniteRel;
 
 /**
- * IgniteSortAggregateBase interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Defines common methods for {@link IgniteRel relational nodes} that 
implement sort-base aggregates.
  */
 interface IgniteSortAggregateBase extends TraitsAwareIgniteRel {
     /**
@@ -44,7 +44,34 @@ interface IgniteSortAggregateBase extends 
TraitsAwareIgniteRel {
      */
     ImmutableBitSet getGroupSet();
 
-    /** {@inheritDoc} */
+    /**
+     * Performs propagation of {@link RelCollation collation trait}.
+     *
+     * <p>Sorted aggregate operate on already sorted input to perform 
aggregation/grouping.
+     * Because of that such operator produces rows sorted by grouping columns, 
and it can produce
+     * results that may satisfy required collation trait (ordering).
+     *
+     * <p>If a grouping set contains all required ordering columns, then 
inputs should provide
+     * ordering that includes all required ordering columns + the rest of the 
columns used in the grouping set:
+
+     * <pre>
+     *       GROUP BY a, b, c ORDER BY a, b
+     *       Input collation: (a, b, c)
+     * </pre>
+     *
+     * <p>Otherwise this operator is unable to fully satisfy required ordering 
and we require collation trait based
+     * columns from the grouping set, and the rest of the requirements is 
going to be satisfied by enforcer operator:
+     * <pre>
+     *     GROUP BY a, b ORDER BY a, b, c
+     *     Input collation (a, b)
+     *     Enforcer: adds ordering by c
+     * </pre>
+     *
+     * @param nodeTraits Required relational node output traits.
+     * @param inputTraits Traits of input nodes.
+     *
+     * @return Traits satisfied by this expression and traits that input nodes 
should satisfy.
+     */
     @Override
     default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
             RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
index d539aaacd0..53d4ad0da3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -39,6 +40,8 @@ import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -86,9 +89,15 @@ public class MapReduceAggregates {
     }
 
     /**
-     * Creates a physical operator that implements the given logical aggregate 
as MAP/reduce.
+     * Creates a physical operator that implements the given logical aggregate 
as MAP/REDUCE.
+     *
+     * @param agg Logical aggregate expression.
+     * @param builder Builder to create implementations of MAP and REDUCE 
phases.
+     * @param fieldMappingOnReduce Mapping to be applied to group sets on 
REDUCE phase.
+     *
+     * @return A physical node tree that implements the given logical operator.
      */
-    public static IgniteRel buildAggregates(LogicalAggregate agg, 
AggregateRelBuilder builder) {
+    public static IgniteRel buildAggregates(LogicalAggregate agg, 
AggregateRelBuilder builder, Mapping fieldMappingOnReduce) {
 
         //
         // To implement MAP/REDUCE aggregate LogicalAggregate is transformed 
into
@@ -189,11 +198,17 @@ public class MapReduceAggregates {
         assert mapAggCalls.size() <= reduceAggCalls.size() :
                 format("The number of MAP/REDUCE aggregates is not correct. 
MAP: {}\nREDUCE: {}", mapAggCalls, reduceAggCalls);
 
+        // Apply mapping to groupSet/groupSets on REDUCE phase.
+        ImmutableBitSet groupSetOnReduce = 
Mappings.apply(fieldMappingOnReduce, agg.getGroupSet());
+        List<ImmutableBitSet> groupSetsOnReduce = agg.getGroupSets().stream()
+                .map(g -> Mappings.apply(fieldMappingOnReduce, g))
+                .collect(Collectors.toList());
+
         IgniteRel reduce = builder.makeReduceAgg(
                 agg.getCluster(),
                 map,
-                agg.getGroupSet(),
-                agg.getGroupSets(),
+                groupSetOnReduce,
+                groupSetsOnReduce,
                 reduceAggCalls,
                 reduceTypeToUse
         );
@@ -252,7 +267,7 @@ public class MapReduceAggregates {
     }
 
     /**
-     * Used by {@link #buildAggregates(LogicalAggregate, AggregateRelBuilder)}
+     * Used by {@link #buildAggregates(LogicalAggregate, AggregateRelBuilder, 
Mapping)}.
      * to create MAP/REDUCE aggregate nodes.
      */
     public interface AggregateRelBuilder {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
index d6bb88e10a..b14010d030 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.
 import static 
org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg;
 
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
@@ -44,7 +43,6 @@ import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.Aggrega
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HintUtils;
-import org.apache.ignite.internal.sql.engine.util.PlanUtils;
 
 /**
  * Planner rule that recognizes a {@link org.apache.calcite.rel.core.Aggregate}
@@ -106,6 +104,7 @@ public class HashAggregateConverterRule {
             RelOptCluster cluster = agg.getCluster();
             RelTraitSet inTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE);
             RelTraitSet outTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE);
+            Mapping fieldMappingOnReduce = 
Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet());
 
             AggregateRelBuilder relBuilder = new AggregateRelBuilder() {
                 @Override
@@ -115,8 +114,8 @@ public class HashAggregateConverterRule {
                             cluster,
                             outTrait.replace(IgniteDistributions.random()),
                             input,
-                            agg.getGroupSet(),
-                            agg.getGroupSets(),
+                            groupSet,
+                            groupSets,
                             aggregateCalls
                     );
                 }
@@ -125,26 +124,19 @@ public class HashAggregateConverterRule {
                 public IgniteRel makeReduceAgg(RelOptCluster cluster, RelNode 
map, ImmutableBitSet groupSet,
                         List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggregateCalls, RelDataType outputType) {
 
-                    Mapping mapping = 
PlanUtils.computeAggFieldMapping(groupSets);
-
-                    ImmutableBitSet groupSet1 = Commons.mapBitSet(groupSet, 
mapping);
-                    List<ImmutableBitSet> groupSets1 = groupSets.stream()
-                            .map(g -> Commons.mapBitSet(g, mapping))
-                            .collect(Collectors.toList());
-
                     return new IgniteReduceHashAggregate(
                             cluster,
                             outTrait.replace(IgniteDistributions.single()),
                             convert(map, 
inTrait.replace(IgniteDistributions.single())),
-                            groupSet1,
-                            groupSets1,
+                            groupSet,
+                            groupSets,
                             aggregateCalls,
                             outputType
                     );
                 }
             };
 
-            return MapReduceAggregates.buildAggregates(agg, relBuilder);
+            return MapReduceAggregates.buildAggregates(agg, relBuilder, 
fieldMappingOnReduce);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
index d76df11d2a..9a636c39af 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
@@ -43,6 +44,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HintUtils;
 import org.jetbrains.annotations.Nullable;
 
@@ -118,8 +120,14 @@ public class SortAggregateConverterRule {
             RelOptCluster cluster = agg.getCluster();
             RelCollation collation = 
TraitUtils.createCollation(agg.getGroupSet().asList());
 
-            RelTraitSet inTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
-            RelTraitSet outTrait = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
+            // Create mapping to adjust fields on REDUCE phase.
+            Mapping fieldMappingOnReduce = 
Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet());
+
+            // Adjust columns in output collation.
+            RelCollation outputCollation = 
collation.apply(fieldMappingOnReduce);
+
+            RelTraitSet inTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
+            RelTraitSet outTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(outputCollation);
 
             AggregateRelBuilder relBuilder = new AggregateRelBuilder() {
                 @Override
@@ -128,10 +136,10 @@ public class SortAggregateConverterRule {
 
                     return new IgniteMapSortAggregate(
                             cluster,
-                            outTrait.replace(IgniteDistributions.random()),
-                            convert(input, 
inTrait.replace(IgniteDistributions.random())),
-                            agg.getGroupSet(),
-                            agg.getGroupSets(),
+                            outTraits.replace(IgniteDistributions.random()),
+                            convert(input, 
inTraits.replace(IgniteDistributions.random())),
+                            groupSet,
+                            groupSets,
                             aggregateCalls,
                             collation
                     );
@@ -143,18 +151,18 @@ public class SortAggregateConverterRule {
 
                     return new IgniteReduceSortAggregate(
                             cluster,
-                            outTrait.replace(IgniteDistributions.single()),
-                            convert(map, 
inTrait.replace(IgniteDistributions.single())),
-                            agg.getGroupSet(),
-                            agg.getGroupSets(),
+                            outTraits.replace(IgniteDistributions.single()),
+                            convert(map, 
outTraits.replace(IgniteDistributions.single())),
+                            groupSet,
+                            groupSets,
                             aggregateCalls,
                             outputType,
-                            collation
+                            outputCollation
                     );
                 }
             };
 
-            return MapReduceAggregates.buildAggregates(agg, relBuilder);
+            return MapReduceAggregates.buildAggregates(agg, relBuilder, 
fieldMappingOnReduce);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
index 2c58e626fc..6174fa4ebc 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
@@ -148,7 +148,7 @@ public class LogicalOrToUnionRule extends 
RelRule<LogicalOrToUnionRule.Config> {
         }
 
         Mappings.TargetMapping mapping = scan.requiredColumns() == null ? null 
:
-                Commons.inverseTrimmingMapping(fieldCnt, 
scan.requiredColumns());
+                Commons.trimmingMapping(fieldCnt, scan.requiredColumns());
 
         for (RexNode op : operands) {
             BitSet conditionFields = new BitSet(fieldCnt);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
index 2b4ce030a9..23811d7be9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
@@ -87,7 +87,7 @@ public interface TraitsAwareIgniteRel extends IgniteRel {
      *
      * @param nodeTraits Relational node output traits.
      * @param inTraits   Relational node input traits.
-     * @return List of possible input-output traits combinations.
+     * @return Traits satisfied by this expression and traits that input nodes 
should satisfy.
      */
     default Pair<RelTraitSet, List<RelTraitSet>> 
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
         if (inTraits.size() > 1) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 91f5984eb9..1bf5f3a86c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -454,6 +454,8 @@ public final class Commons {
      *
      * <p>To find a new index of element after trimming call {@code 
mapping.getTargetOpt(index)}.
      *
+     * <p>To find an old index of element before trimming call {@code 
mapping.getSourceOpt(index)}.
+     *
      * <p>This mapping can be used to adjust traits or aggregations, for 
example, when several fields have been truncated.
      * Assume the following scenario:
      * <pre>
@@ -477,62 +479,14 @@ public final class Commons {
      * @see org.apache.calcite.plan.RelTrait#apply(TargetMapping)
      * @see org.apache.calcite.rel.core.AggregateCall#transform(TargetMapping)
      */
-    public static Mappings.TargetMapping trimmingMapping(int sourceSize, 
ImmutableBitSet requiredElements) {
-        Mapping mapping = Mappings.create(MappingType.PARTIAL_FUNCTION, 
sourceSize, requiredElements.cardinality());
+    public static Mapping trimmingMapping(int sourceSize, ImmutableBitSet 
requiredElements) {
+        Mapping mapping = Mappings.create(MappingType.INVERSE_SURJECTION, 
sourceSize, requiredElements.cardinality());
         for (Ord<Integer> ord : Ord.zip(requiredElements)) {
             mapping.set(ord.e, ord.i);
         }
         return mapping;
     }
 
-    /**
-     * Create a mapping to redo trimming made by {@link #trimmingMapping(int, 
ImmutableBitSet)}.
-     *
-     * <p>To find an old index of element before trimming call {@code 
mapping.getSourceOpt(index)}.
-     *
-     * <p>This mapping can be used to remap traits or aggregates back as if 
the trimming has never happened.
-     *
-     * @param sourceSize Count of elements in a non trimmed collection.
-     * @param requiredElements Elements which were preserved during trimming.
-     * @return A mapping to restore the original mapping.
-     * @see #trimmingMapping(int, ImmutableBitSet)
-     */
-    public static Mappings.TargetMapping inverseTrimmingMapping(int 
sourceSize, ImmutableBitSet requiredElements) {
-        return Mappings.invert(trimmingMapping(sourceSize, 
requiredElements).inverse());
-    }
-
-    /**
-     * Produces new bitset setting bits according to the given mapping.
-     *
-     * <pre>
-     * bitset:
-     *   [0, 1, 4]
-     * mapping:
-     *   1 -> 0
-     *   0 -> 1
-     *   4 -> 3
-     * result:
-     *   [0, 1, 3]
-     * </pre>
-     *
-     * @param bitset A bitset.
-     * @param mapping Mapping to use.
-     * @return  a transformed bit set.
-     */
-    public static ImmutableBitSet mapBitSet(ImmutableBitSet bitset, Mapping 
mapping) {
-        ImmutableBitSet.Builder result = ImmutableBitSet.builder();
-
-        int bitPos = bitset.nextSetBit(0);
-
-        while (bitPos != -1) {
-            int target = mapping.getTarget(bitPos);
-            result.set(target);
-            bitPos = bitset.nextSetBit(bitPos + 1);
-        }
-
-        return result.build();
-    }
-
 
     /**
      * Checks if there is a such permutation of all {@code elems} that is 
prefix of provided {@code seq}.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
index ffae97bf89..24e5e49602 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.util;
 
-import java.util.BitSet;
 import java.util.List;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
@@ -31,8 +30,6 @@ import org.apache.calcite.sql.fun.SqlSumAggFunction;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mapping;
-import org.apache.calcite.util.mapping.MappingType;
-import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulator;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -107,7 +104,8 @@ public class PlanUtils {
     public static RelDataType createHashAggRowType(List<ImmutableBitSet> 
groupSets,
             IgniteTypeFactory typeFactory, RelDataType inputType, 
List<AggregateCall> aggregateCalls) {
 
-        Mapping mapping = computeAggFieldMapping(groupSets);
+        ImmutableBitSet fieldIndices = ImmutableBitSet.union(groupSets);
+        Mapping mapping = Commons.trimmingMapping(fieldIndices.length(), 
fieldIndices);
 
         RelDataTypeFactory.Builder builder = typeFactory.builder();
 
@@ -124,40 +122,6 @@ public class PlanUtils {
         return builder.build();
     }
 
-    /**
-     * Creates grouping set keys mapping for REDUCE phase of MAP/REDUCE 
aggregates.
-     * <pre>
-     * group sets:
-     *   [0], [2, 0], [3]
-     * mapping:
-     *   0 -> 0
-     *   2 -> 1
-     *   3 -> 2
-     * </pre>
-     */
-    public static Mapping computeAggFieldMapping(List<ImmutableBitSet> 
groupingSets) {
-        BitSet fieldIndices = new BitSet();
-
-        for (ImmutableBitSet groupingSet : groupingSets) {
-            for (int field : groupingSet) {
-                fieldIndices.set(field);
-            }
-        }
-
-        Mapping mapping = Mappings.create(MappingType.INVERSE_SURJECTION, 
fieldIndices.length(), fieldIndices.cardinality());
-
-        int i = 0;
-        int bitPos = fieldIndices.nextSetBit(0);
-
-        while (bitPos != -1) {
-            mapping.set(bitPos, i);
-            bitPos = fieldIndices.nextSetBit(bitPos + 1);
-            i++;
-        }
-
-        return mapping;
-    }
-
     private static void addAccumulatorFields(IgniteTypeFactory typeFactory, 
List<AggregateCall> aggregateCalls, Builder builder) {
         Accumulators accumulators = new Accumulators(typeFactory);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
index 606f7d7dc0..d65185813e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
@@ -260,7 +260,7 @@ public class RexUtils {
         Mappings.TargetMapping mapping = null;
 
         if (requiredColumns != null) {
-            mapping = Commons.inverseTrimmingMapping(types.size(), 
requiredColumns);
+            mapping = Commons.trimmingMapping(types.size(), requiredColumns);
         }
 
         List<SearchBounds> bounds = Arrays.asList(new 
SearchBounds[collation.getFieldCollations().size()]);
@@ -337,7 +337,7 @@ public class RexUtils {
 
         Mappings.TargetMapping toTrimmedRowMapping = null;
         if (requiredColumns != null) {
-            toTrimmedRowMapping = Commons.inverseTrimmingMapping(types.size(), 
requiredColumns);
+            toTrimmedRowMapping = Commons.trimmingMapping(types.size(), 
requiredColumns);
         }
 
         List<RelFieldCollation> fieldCollations = 
collation.getFieldCollations();
@@ -392,7 +392,7 @@ public class RexUtils {
         Mappings.TargetMapping mapping = null;
 
         if (requiredColumns != null) {
-            mapping = Commons.inverseTrimmingMapping(types.size(), 
requiredColumns);
+            mapping = Commons.trimmingMapping(types.size(), requiredColumns);
         }
 
         for (Entry<List<RexCall>> fld : 
fieldsToPredicates.int2ObjectEntrySet()) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
index 3ee52dd67a..daaa6e7f25 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
@@ -37,7 +37,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.MapReduceAgg;
-import org.apache.ignite.internal.sql.engine.util.PlanUtils;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
  * HashAggregateExecutionTest.
@@ -116,7 +116,8 @@ public class HashAggregateExecutionTest extends 
BaseAggregateTest {
 
         aggMap.register(scan);
 
-        Mapping reduceMapping = PlanUtils.computeAggFieldMapping(grpSets);
+        ImmutableBitSet grpSet = grpSets.get(0);
+        Mapping reduceMapping = Commons.trimmingMapping(grpSet.length(), 
grpSet);
         MapReduceAgg mapReduceAgg = 
MapReduceAggregates.createMapReduceAggCall(call, 
reduceMapping.getTargetCount());
 
         HashAggregateNode<Object[]> aggRdc = new HashAggregateNode<>(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
index 7040cd02a3..f40ccda15c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
@@ -37,7 +37,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.MapReduceAgg;
-import org.apache.ignite.internal.sql.engine.util.PlanUtils;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
  * SortAggregateExecutionTest.
@@ -124,7 +124,6 @@ public class SortAggregateExecutionTest extends 
BaseAggregateTest {
 
         // The group's fields placed on the begin of the output row (planner
         // does this by Projection node for aggregate input).
-        // Hash aggregate doesn't use groups set on reducer because send 
GroupKey as object.
         ImmutableIntList reduceGrpFields = ImmutableIntList.copyOf(
                 IntStream.range(0, 
grpSet.cardinality()).boxed().collect(Collectors.toList())
         );
@@ -137,8 +136,8 @@ public class SortAggregateExecutionTest extends 
BaseAggregateTest {
             rdcCmp = (k1, k2) -> 0;
         }
 
-        Mapping reduceMapping = PlanUtils.computeAggFieldMapping(grpSets);
-        MapReduceAgg mapReduceAgg = 
MapReduceAggregates.createMapReduceAggCall(call, 
reduceMapping.getTargetCount());
+        Mapping mapping = Commons.trimmingMapping(grpSet.length(), grpSet);
+        MapReduceAgg mapReduceAgg = 
MapReduceAggregates.createMapReduceAggCall(call, mapping.getTargetCount());
 
         SortAggregateNode<Object[]> aggRdc = new SortAggregateNode<>(
                 ctx,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
index f268ac7e49..09e176efe0 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
@@ -29,8 +29,10 @@ import java.util.Objects;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.schema.NativeTypes;
 import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
 import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
@@ -39,6 +41,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -847,6 +850,48 @@ public abstract class AbstractAggregatePlannerTest extends 
AbstractPlannerTest {
          * <p>Distribution identity(1)
          */
         CASE_23C("SELECT val0, AVG(val1) FROM test GROUP BY val0", 
schema(identity(1))),
+
+        /**
+         * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+         *
+         * <p>Distribution single()
+         */
+        CASE_24_1("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test", 
schema(single())),
+
+        /**
+         * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+         *
+         * <p>Distribution hash(0)
+         */
+        CASE_24_1A("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test", 
schema(hash(0))),
+
+        /**
+         * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+         *
+         * <p>Distribution hash(1)
+         */
+        CASE_24_1B("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test", 
schema(hash(1))),
+
+        /**
+         * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+         *
+         * <p>Distribution hash(2)
+         */
+        CASE_24_1C("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test", 
schema(hash(2))),
+
+        /**
+         * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+         *
+         * <p>Distribution identity(1)
+         */
+        CASE_24_1D("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test", 
schema(identity(1))),
+
+        /**
+         * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+         *
+         * <p>Distribution identity(2)
+         */
+        CASE_24_1E("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test", 
schema(identity(2))),
         ;
 
         final String query;
@@ -993,4 +1038,31 @@ public abstract class AbstractAggregatePlannerTest 
extends AbstractPlannerTest {
             return true;
         };
     }
+
+    <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>> 
groupSets, int groupKey) {
+        return (node) -> {
+            List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+            ImmutableBitSet firstGroupSet = allGroupSets.get(0);
+
+            boolean groupSetsMatch = 
allGroupSets.equals(List.of(ImmutableBitSet.of(groupKey)));
+            boolean groupSetMatches = 
firstGroupSet.equals(ImmutableBitSet.of(groupKey));
+
+            return groupSetMatches && groupSetsMatch;
+        };
+    }
+
+    <T> Predicate<T> hasNoGroupSets(Function<T, List<ImmutableBitSet>> 
groupSets) {
+        return (node) -> {
+            List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+            List<ImmutableBitSet> emptyGroupSets = 
List.of(ImmutableBitSet.of());
+            return emptyGroupSets.equals(allGroupSets);
+        };
+    }
+
+    <T extends RelNode> Predicate<T> hasCollation(RelCollation expected) {
+        return (node) -> {
+            RelCollation collation = TraitUtils.collation(node);
+            return expected.equals(collation);
+        };
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index 8842af8cfc..11f3d46a9e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
@@ -515,6 +516,45 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         assertPlan(TestCase.CASE_23C, colocatedGroupBy);
     }
 
+    /**
+     * Validates a plan for a query with two aggregates: one w/o DISTINCT and 
one with DISTINCT: single distribution.
+     */
+    @Test
+    public void countDistinctGroupSetSingle() throws Exception {
+        assertPlan(TestCase.CASE_24_1, 
isInstanceOf(IgniteColocatedHashAggregate.class)
+                
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+                .and(input(isInstanceOf(IgniteTableScan.class)
+                )));
+    }
+
+    /**
+     * Validates a plan for a query with two aggregates: one w/o DISTINCT and 
one with DISTINCT: hash distribution.
+     */
+    @Test
+    public void countDistinctGroupSetHash() throws Exception {
+        checkCountDistinctHash(TestCase.CASE_24_1A);
+        checkCountDistinctHash(TestCase.CASE_24_1B);
+        checkCountDistinctHash(TestCase.CASE_24_1D);
+
+        Predicate<RelNode> colocated = 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                .and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single())
+                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                        
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
+                                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                                
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
+                                        ))
+                                ))
+                        )
+                ))
+
+        );
+
+        assertPlan(TestCase.CASE_24_1C, colocated);
+        assertPlan(TestCase.CASE_24_1E, colocated);
+    }
+
     private void checkSimpleAggSingle(TestCase testCase) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
@@ -769,4 +809,19 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
                         ))
         );
     }
+
+    private void checkCountDistinctHash(TestCase testCase) throws Exception {
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                
.and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                        
.and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                
.and(hasDistribution(IgniteDistributions.single())
+                                        
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                                
.and(hasGroupSets(IgniteMapHashAggregate::getGroupSets, 1))
+                                        ))
+                                )
+                        ))
+                ))
+        ));
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index 54fe13c0bb..eba053ffab 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -459,6 +460,40 @@ public class ColocatedHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
     }
 
+    /**
+     * Validates a plan for a query with two aggregates: one w/o DISTINCT and 
one with DISTINCT: single distribution.
+     */
+    @Test
+    public void countDistinctGroupSetSingle() throws Exception {
+        assertPlan(TestCase.CASE_24_1, 
isInstanceOf(IgniteColocatedHashAggregate.class)
+                
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+                .and(input(isInstanceOf(IgniteTableScan.class)
+                )), disableRules);
+    }
+
+    /**
+     * Validates a plan for a query with two aggregates: one w/o DISTINCT and 
one with DISTINCT: hash distribution.
+     */
+    @Test
+    public void countDistinctGroupSetHash() throws Exception {
+        checkCountDistinctHash(TestCase.CASE_24_1A);
+        checkCountDistinctHash(TestCase.CASE_24_1B);
+        checkCountDistinctHash(TestCase.CASE_24_1D);
+
+        Predicate<RelNode> colocated2 = 
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+                
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+                                
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
+                                
.and(input(isInstanceOf(IgniteTableScan.class))))
+                        ))
+                ));
+
+        assertPlan(TestCase.CASE_24_1C, colocated2, disableRules);
+        assertPlan(TestCase.CASE_24_1E, colocated2, disableRules);
+    }
+
     private void checkSimpleAggSingle(TestCase testCase) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
@@ -690,4 +725,13 @@ public class ColocatedHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 disableRules
         );
     }
+
+    private void checkCountDistinctHash(TestCase testCase) throws Exception {
+        assertPlan(testCase, isInstanceOf(IgniteColocatedHashAggregate.class)
+                
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(input(isInstanceOf(IgniteTableScan.class))))
+                ), disableRules);
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 4c66544ec5..81f586e5f9 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -461,6 +462,29 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(TestCase.CASE_23C, checkPlan, disableRules);
     }
 
+    /**
+     * Validate query with two aggregates: one w/o DISTINCT and one with 
DISTINCT.
+     */
+    @Test
+    public void countDistinctGroupSetSingle() throws Exception {
+        assertPlan(TestCase.CASE_24_1, 
isInstanceOf(IgniteColocatedSortAggregate.class)
+                
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
+                .and(input(isInstanceOf(IgniteTableScan.class)
+                )), disableRules);
+    }
+
+    /**
+     * Validate query with two aggregates: one w/o DISTINCT and one with 
DISTINCT: hash distribution.
+     */
+    @Test
+    public void countDistinctGroupSetHash() throws Exception {
+        checkCountDistinctHash(TestCase.CASE_24_1A);
+        checkCountDistinctHash(TestCase.CASE_24_1B);
+        checkCountDistinctHash(TestCase.CASE_24_1C);
+        checkCountDistinctHash(TestCase.CASE_24_1D);
+        checkCountDistinctHash(TestCase.CASE_24_1E);
+    }
+
     private void checkSimpleAggSingle(TestCase testCase) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
@@ -726,4 +750,13 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 disableRules
         );
     }
+
+    private void checkCountDistinctHash(TestCase testCase) throws Exception {
+        assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
+                
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(input(isInstanceOf(IgniteTableScan.class))))
+                ), disableRules);
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
new file mode 100644
index 0000000000..fc8053ac4c
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteValues;
+import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link MapReduceAggregates}.
+ */
+public class MapReduceAggregatesTest {
+
+    /**
+     * Checks that mapping is applied to {@code groupSet} and {@code 
groupSets} arguments to methods of {@link AggregateRelBuilder}.
+     */
+    @Test
+    public void testGroupSetMapping() {
+        IgniteTypeFactory typeFactory = Commons.typeFactory();
+
+        RelDataType rowType = typeFactory.builder().add("f1", 
SqlTypeName.INTEGER)
+                .add("f2", SqlTypeName.INTEGER)
+                .build();
+
+        RelOptCluster cluster = Commons.cluster();
+        RelTraitSet traitSet = RelTraitSet.createEmpty();
+
+        // Input to aggregate
+        LogicalValues values = new LogicalValues(cluster, traitSet, rowType, 
ImmutableList.of());
+
+        // Aggregate
+        ImmutableBitSet groupSet = ImmutableBitSet.of(1);
+        AggregateCall aggregateCall = newCall(typeFactory, List.of(1));
+
+        LogicalAggregate aggregate = new LogicalAggregate(cluster, traitSet, 
List.of(), values,
+                groupSet, List.of(groupSet), List.of(aggregateCall));
+
+        Mapping mapping = Mappings.create(MappingType.PARTIAL_FUNCTION, 2, 5);
+        mapping.set(1, 3);
+
+        GroupSetCollector collect = new GroupSetCollector(rowType);
+
+        MapReduceAggregates.buildAggregates(aggregate, collect, mapping);
+
+        Pair<ImmutableBitSet, List<ImmutableBitSet>> mapGroups = 
collect.collectedGroupSets.get(0);
+        Pair<ImmutableBitSet, List<ImmutableBitSet>> reduceGroups = 
collect.collectedGroupSets.get(1);
+        ImmutableBitSet mappedGroupSet = Mappings.apply(mapping, groupSet);
+
+        assertEquals(Pair.of(groupSet, List.of(groupSet)), mapGroups, "group 
sets on MAP phase");
+        assertEquals(Pair.of(mappedGroupSet, List.of(mappedGroupSet)), 
reduceGroups, "group sets on REDUCE phase");
+    }
+
+    private static AggregateCall newCall(IgniteTypeFactory typeFactory, 
List<Integer> args) {
+        return AggregateCall.create(SqlStdOperatorTable.COUNT,
+                false, false, false, args, -1, null,
+                RelCollations.EMPTY,
+                typeFactory.createSqlType(SqlTypeName.BIGINT),
+                "count");
+    }
+
+    private static class GroupSetCollector implements AggregateRelBuilder {
+        private final List<Pair<ImmutableBitSet, List<ImmutableBitSet>>> 
collectedGroupSets = new ArrayList<>();
+        private final RelDataType rowType;
+
+        GroupSetCollector(RelDataType rowType) {
+            this.rowType = rowType;
+        }
+
+        @Override
+        public IgniteRel makeMapAgg(RelOptCluster cluster,
+                RelNode input,
+                ImmutableBitSet groupSet,
+                List<ImmutableBitSet> groupSets,
+                List<AggregateCall> aggregateCalls) {
+
+            collectedGroupSets.add(Pair.of(groupSet, groupSets));
+
+            return createOutExpr(cluster, input);
+        }
+
+        @Override
+        public IgniteRel makeReduceAgg(RelOptCluster cluster,
+                RelNode map,
+                ImmutableBitSet groupSet,
+                List<ImmutableBitSet> groupSets,
+                List<AggregateCall> aggregateCalls,
+                RelDataType outputType) {
+
+            collectedGroupSets.add(Pair.of(groupSet, groupSets));
+
+            return createOutExpr(cluster, map);
+        }
+
+        private IgniteValues createOutExpr(RelOptCluster cluster, RelNode 
input) {
+            // Expression can be of any type and implementation of 
MapReduceAggregates should not probe into it.
+            return new IgniteValues(cluster, rowType, ImmutableList.of(), 
input.getTraitSet());
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index a525b719eb..392d56843b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.function.Predicate;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
@@ -37,6 +38,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.junit.jupiter.api.Test;
@@ -512,6 +514,39 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
     }
 
+    /**
+     * Validate query with two aggregates: one w/o DISTINCT and one with 
DISTINCT: hash distribution.
+     */
+    @Test
+    public void countDistinctGroupSetSingle() throws Exception {
+        Predicate<IgniteReduceHashAggregate> inputAgg = 
isInstanceOf(IgniteReduceHashAggregate.class)
+                .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
+                .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                        .and(hasGroupSets(Aggregate::getGroupSets, 1))
+                ));
+
+        assertPlan(TestCase.CASE_24_1, 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                        
.and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets))
+                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                
.and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets))
+                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+                                ))
+                        )),
+                disableRules);
+    }
+
+    /**
+     * Validates a plan for a query with two aggregates: one w/o DISTINCT and 
one with DISTINCT: single distribution.
+     */
+    @Test
+    public void countDistinctGroupSetHash() throws Exception {
+        checkCountDistinctHash(TestCase.CASE_24_1A);
+        checkCountDistinctHash(TestCase.CASE_24_1B);
+        checkCountDistinctHash(TestCase.CASE_24_1C);
+        checkCountDistinctHash(TestCase.CASE_24_1D);
+        checkCountDistinctHash(TestCase.CASE_24_1E);
+    }
+
     private void checkSimpleAggSingle(TestCase testCase) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
@@ -746,4 +781,25 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 disableRules
         );
     }
+
+
+    private void checkCountDistinctHash(TestCase testCase) throws Exception {
+        Predicate<IgniteReduceHashAggregate> inputAgg = 
isInstanceOf(IgniteReduceHashAggregate.class)
+                .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                .and(hasGroupSets(Aggregate::getGroupSets, 1))
+                        ))
+                ));
+
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                        
.and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets))
+                        .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                
.and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets))
+                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+                                ))
+                        )),
+                disableRules);
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index a02267ab11..e39bc01c44 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -26,7 +26,9 @@ import java.util.List;
 import java.util.Objects;
 import java.util.function.Predicate;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
@@ -37,6 +39,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.junit.jupiter.api.Test;
@@ -494,6 +497,46 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
     }
 
+    /**
+     * Validates a plan for a query with two aggregates: one w/o DISTINCT and 
one with DISTINCT: single distribution.
+     */
+    @Test
+    public void countDistinctGroupSetSingle() throws Exception {
+        Predicate<IgniteReduceSortAggregate> inputAgg = 
isInstanceOf(IgniteReduceSortAggregate.class)
+                .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
+                .and(hasCollation(RelCollations.of(0)))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasCollation(RelCollations.of(0)))
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                .and(hasCollation(RelCollations.of(1)))
+                                .and(hasGroupSets(Aggregate::getGroupSets, 1))
+                ))));
+
+        assertPlan(TestCase.CASE_24_1, 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                        
.and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+                        .and(hasCollation(RelCollations.EMPTY))
+                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
+                                .and(hasCollation(RelCollations.EMPTY))
+                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+                                ))
+                        )),
+                disableRules);
+    }
+
+    /**
+     * Validates a plan for a query with two aggregates: one w/o DISTINCT and 
one with DISTINCT: hash distribution.
+     */
+    @Test
+    public void countDistinctGroupSetHash() throws Exception {
+        checkCountDistinctHash(TestCase.CASE_24_1A);
+        checkCountDistinctHash(TestCase.CASE_24_1B);
+        checkCountDistinctHash(TestCase.CASE_24_1C);
+        checkCountDistinctHash(TestCase.CASE_24_1D);
+        checkCountDistinctHash(TestCase.CASE_24_1E);
+    }
+
     private void checkSimpleAggSingle(TestCase testCase) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
@@ -821,4 +864,28 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 disableRules
         );
     }
+
+    private void checkCountDistinctHash(TestCase testCase) throws Exception {
+        Predicate<IgniteReduceSortAggregate> inputAgg = 
isInstanceOf(IgniteReduceSortAggregate.class)
+                .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
+                .and(hasCollation(RelCollations.of(0)))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(hasCollation(RelCollations.of(0)))
+                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                .and(hasGroupSets(Aggregate::getGroupSets, 1))
+                        ))
+                ));
+
+        assertPlan(testCase, 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+                        
.and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+                        .and(hasCollation(RelCollations.EMPTY))
+                        .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
+                                .and(hasCollation(RelCollations.EMPTY))
+                                
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+                                ))
+                        )),
+                disableRules);
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
index 5c8de83374..344e94e021 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
@@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mapping;
-import org.apache.calcite.util.mapping.MappingType;
 import org.apache.calcite.util.mapping.Mappings;
 import org.junit.jupiter.api.Test;
 
@@ -31,24 +30,19 @@ import org.junit.jupiter.api.Test;
 public class CommonsTest {
 
     @Test
-    public void testMapBitSet() {
-        Mapping mapping = Mappings.create(MappingType.PARTIAL_FUNCTION, 7, 7);
-        mapping.set(0, 1);
-        mapping.set(4, 6);
-        mapping.set(1, 4);
+    public void testTrimmingMapping() {
+        ImmutableBitSet requiredElements = ImmutableBitSet.of(1, 4, 3, 7);
 
-        ImmutableBitSet bitSet = ImmutableBitSet.of(1, 0, 4);
-
-        ImmutableBitSet actual = Commons.mapBitSet(bitSet, mapping);
-        assertEquals(ImmutableBitSet.of(1, 4, 6), actual);
+        Mapping mapping = Commons.trimmingMapping(requiredElements.length(), 
requiredElements);
+        expectMapped(mapping, ImmutableBitSet.of(1, 7), ImmutableBitSet.of(0, 
3));
+        expectMapped(mapping, ImmutableBitSet.of(3), ImmutableBitSet.of(1));
+        expectMapped(mapping, ImmutableBitSet.of(1, 4, 3, 7), 
ImmutableBitSet.of(0, 1, 2, 3));
     }
 
-    @Test
-    public void testMapEmptyBitSet() {
-        Mapping mapping = Mappings.createIdentity(1);
-        ImmutableBitSet bitSet = ImmutableBitSet.of();
+    private static void expectMapped(Mapping mapping, ImmutableBitSet bitSet, 
ImmutableBitSet expected) {
+        assertEquals(expected, Mappings.apply(mapping, bitSet), "direct 
mapping");
 
-        ImmutableBitSet actual = Commons.mapBitSet(bitSet, mapping);
-        assertEquals(ImmutableBitSet.of(), actual);
+        Mapping inverseMapping = mapping.inverse();
+        assertEquals(bitSet, Mappings.apply(inverseMapping, expected), 
"inverse mapping");
     }
 }


Reply via email to