This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 950a3c0428 IGNITE-20164 Sql. Incorrect propagation of RelCollation
trait for Sort-based map/reduce aggregates (#2461)
950a3c0428 is described below
commit 950a3c0428e83bf717c5430a91470cf3aee27d58
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Sep 1 14:08:13 2023 +0300
IGNITE-20164 Sql. Incorrect propagation of RelCollation trait for
Sort-based map/reduce aggregates (#2461)
---
.../internal/sql/engine/ItAggregatesTest.java | 411 ++++++++++-----------
.../sql/engine/rel/agg/IgniteMapSortAggregate.java | 2 +-
.../engine/rel/agg/IgniteSortAggregateBase.java | 33 +-
.../sql/engine/rel/agg/MapReduceAggregates.java | 25 +-
.../engine/rule/HashAggregateConverterRule.java | 20 +-
.../engine/rule/SortAggregateConverterRule.java | 32 +-
.../engine/rule/logical/LogicalOrToUnionRule.java | 2 +-
.../sql/engine/trait/TraitsAwareIgniteRel.java | 2 +-
.../ignite/internal/sql/engine/util/Commons.java | 54 +--
.../ignite/internal/sql/engine/util/PlanUtils.java | 40 +-
.../ignite/internal/sql/engine/util/RexUtils.java | 6 +-
.../exec/rel/HashAggregateExecutionTest.java | 5 +-
.../exec/rel/SortAggregateExecutionTest.java | 7 +-
.../planner/AbstractAggregatePlannerTest.java | 72 ++++
.../sql/engine/planner/AggregatePlannerTest.java | 55 +++
.../planner/ColocatedHashAggregatePlannerTest.java | 44 +++
.../planner/ColocatedSortAggregatePlannerTest.java | 33 ++
.../engine/planner/MapReduceAggregatesTest.java | 138 +++++++
.../planner/MapReduceHashAggregatePlannerTest.java | 56 +++
.../planner/MapReduceSortAggregatePlannerTest.java | 67 ++++
.../internal/sql/engine/util/CommonsTest.java | 26 +-
21 files changed, 763 insertions(+), 367 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index ace179bcec..e6b5f41a95 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -57,7 +57,7 @@ public class ItAggregatesTest extends
ClusterPerClassIntegrationTest {
* Before all.
*/
@BeforeAll
- static void initTestData() throws InterruptedException {
+ static void initTestData() {
createAndPopulateTable();
sql("CREATE ZONE test_zone with replicas=2, partitions=10");
@@ -71,6 +71,25 @@ public class ItAggregatesTest extends
ClusterPerClassIntegrationTest {
sql("INSERT INTO test (id, grp0, grp1, val0, val1) VALUES (?, ?,
?, ?, ?)", i, i / 10, i / 100, 1, 2);
sql("INSERT INTO test_one_col_idx (pk, col0) VALUES (?, ?)", i, i);
}
+
+ sql("CREATE TABLE t1_colo_val1(id INT, val0 VARCHAR, val1 VARCHAR,
val2 VARCHAR, PRIMARY KEY(id, val1)) "
+ + "COLOCATE BY (val1)");
+
+ sql("CREATE TABLE t2_colo_va1(id INT, val0 VARCHAR, val1 VARCHAR, val2
VARCHAR, PRIMARY KEY(id, val1)) "
+ + "COLOCATE BY (val1)");
+
+ for (int i = 0; i < 100; i++) {
+ sql("INSERT INTO t1_colo_val1 VALUES (?, ?, ?, ?)", i, "val" + i,
"val" + i % 2, "val" + i);
+ }
+
+ sql("INSERT INTO t2_colo_va1 VALUES (0, 'val0', 'val0', 'val0'), (1,
'val1', 'val1', 'val1')");
+
+ sql("CREATE TABLE test_a_b_s (id INTEGER PRIMARY KEY, a INTEGER, b
INTEGER, s VARCHAR);");
+ sql("INSERT INTO test_a_b_s VALUES (1, 11, 1, 'hello'), (2, 12, 2,
'world'), (3, 11, 3, NULL)");
+ sql("INSERT INTO test_a_b_s VALUES (4, 11, 3, 'hello'), (5, 12, 2,
'world'), (6, 10, 5, 'ahello'), (7, 13, 6, 'world')");
+
+ sql("CREATE TABLE test_str_int_real_dec "
+ + "(id INTEGER PRIMARY KEY, str_col VARCHAR, int_col INTEGER,
real_col REAL, dec_col DECIMAL)");
}
@ParameterizedTest
@@ -267,94 +286,63 @@ public class ItAggregatesTest extends
ClusterPerClassIntegrationTest {
@Test
public void testColocatedAggregate() {
- try {
- sql("CREATE TABLE t1(id INT, val0 VARCHAR, val1 VARCHAR, val2
VARCHAR, PRIMARY KEY(id, val1)) "
- + "COLOCATE BY (val1)");
-
- sql("CREATE TABLE t2(id INT, val0 VARCHAR, val1 VARCHAR, val2
VARCHAR, PRIMARY KEY(id, val1)) "
- + "COLOCATE BY (val1)");
+ String sql = "SELECT val1, count(val2) FROM t1_colo_val1 GROUP BY
val1";
- for (int i = 0; i < 100; i++) {
- sql("INSERT INTO t1 VALUES (?, ?, ?, ?)", i, "val" + i, "val"
+ i % 2, "val" + i);
- }
-
- sql("INSERT INTO t2 VALUES (0, 'val0', 'val0', 'val0'), (1,
'val1', 'val1', 'val1')");
-
- String sql = "SELECT val1, count(val2) FROM t1 GROUP BY val1";
-
- assertQuery(sql)
-
.matches(QueryChecker.matches(".*Exchange.*Colocated.*Aggregate.*"))
- .returns("val0", 50L)
- .returns("val1", 50L)
- .check();
+ assertQuery(sql)
+
.matches(QueryChecker.matches(".*Exchange.*Colocated.*Aggregate.*"))
+ .returns("val0", 50L)
+ .returns("val1", 50L)
+ .check();
- sql = "SELECT t2.val1, agg.cnt "
- + "FROM t2 JOIN (SELECT val1, COUNT(val2) AS cnt FROM t1
GROUP BY val1) AS agg ON t2.val1 = agg.val1";
+ sql = "SELECT t2_colo_va1.val1, agg.cnt "
+ + "FROM t2_colo_va1 JOIN (SELECT val1, COUNT(val2) AS cnt FROM
t1_colo_val1 GROUP BY val1) "
+ + "AS agg ON t2_colo_va1.val1 = agg.val1";
- assertQuery(sql)
-
.matches(QueryChecker.matches(".*Exchange.*Join.*Colocated.*Aggregate.*"))
- .returns("val0", 50L)
- .returns("val1", 50L)
- .check();
- } finally {
- sql("DROP TABLE IF EXISTS t1");
- sql("DROP TABLE IF EXISTS t2");
- }
+ assertQuery(sql)
+
.matches(QueryChecker.matches(".*Exchange.*Join.*Colocated.*Aggregate.*"))
+ .returns("val0", 50L)
+ .returns("val1", 50L)
+ .check();
}
@ParameterizedTest
@MethodSource("provideRules")
public void testColocatedAggregate(String[] rules) {
- try {
- sql("CREATE TABLE t1(id INT, val0 VARCHAR, val1 VARCHAR, val2
VARCHAR, PRIMARY KEY(id, val1)) "
- + "COLOCATE BY (val1)");
-
- sql("CREATE TABLE t2(id INT, val0 VARCHAR, val1 VARCHAR, val2
VARCHAR, PRIMARY KEY(id, val1)) "
- + "COLOCATE BY (val1)");
-
- for (int i = 0; i < 100; i++) {
- sql("INSERT INTO t1 VALUES (?, ?, ?, ?)", i, "val" + i, "val"
+ i % 2, "val" + i);
- }
-
- sql("INSERT INTO t2 VALUES (0, 'val0', 'val0', 'val0'), (1,
'val1', 'val1', 'val1')");
+ String sql = "SELECT val1, count(val2) FROM t1_colo_val1 GROUP BY
val1";
- String sql = "SELECT val1, count(val2) FROM t1 GROUP BY val1";
-
- assertQuery(sql)
- .disableRules(rules)
- .returns("val0", 50L)
- .returns("val1", 50L)
- .check();
+ assertQuery(sql)
+ .disableRules(rules)
+ .returns("val0", 50L)
+ .returns("val1", 50L)
+ .check();
- sql = "SELECT t2.val1, agg.cnt "
- + "FROM t2 JOIN (SELECT val1, COUNT(val2) AS cnt FROM t1
GROUP BY val1) AS agg ON t2.val1 = agg.val1";
+ sql = "SELECT t2_colo_va1.val1, agg.cnt "
+ + "FROM t2_colo_va1 JOIN (SELECT val1, COUNT(val2) AS cnt FROM
t1_colo_val1 GROUP BY val1) "
+ + "AS agg ON t2_colo_va1.val1 = agg.val1";
- assertQuery(sql)
- .disableRules(rules)
- .returns("val0", 50L)
- .returns("val1", 50L)
- .check();
- } finally {
- sql("DROP TABLE IF EXISTS t1");
- sql("DROP TABLE IF EXISTS t2");
- }
+ assertQuery(sql)
+ .disableRules(rules)
+ .returns("val0", 50L)
+ .returns("val1", 50L)
+ .check();
}
@Test
public void testEverySomeAggregate() {
- sql("CREATE TABLE t(c0 INT PRIMARY KEY, c1 INT, c2 INT)");
- sql("INSERT INTO t VALUES (1, null, 0)");
- sql("INSERT INTO t VALUES (2, 0, null)");
- sql("INSERT INTO t VALUES (3, null, null)");
- sql("INSERT INTO t VALUES (4, 0, 1)");
- sql("INSERT INTO t VALUES (5, 1, 1)");
- sql("INSERT INTO t VALUES (6, 1, 2)");
- sql("INSERT INTO t VALUES (7, 2, 2)");
-
- assertQuery("SELECT EVERY(c1 < c2) FROM t").returns(false).check();
- assertQuery("SELECT SOME(c1 < c2) FROM t").returns(true).check();
- assertQuery("SELECT EVERY(c1 <= c2) FROM t").returns(true).check();
- assertQuery("SELECT SOME(c1 > c2) FROM t").returns(false).check();
+ sql("DELETE FROM test_a_b_s");
+
+ sql("INSERT INTO test_a_b_s(id, a, b) VALUES (1, null, 0)");
+ sql("INSERT INTO test_a_b_s(id, a, b) VALUES (2, 0, null)");
+ sql("INSERT INTO test_a_b_s(id, a, b) VALUES (3, null, null)");
+ sql("INSERT INTO test_a_b_s(id, a, b) VALUES (4, 0, 1)");
+ sql("INSERT INTO test_a_b_s(id, a, b) VALUES (5, 1, 1)");
+ sql("INSERT INTO test_a_b_s(id, a, b) VALUES (6, 1, 2)");
+ sql("INSERT INTO test_a_b_s(id, a, b) VALUES (7, 2, 2)");
+
+ assertQuery("SELECT EVERY(a < b) FROM
test_a_b_s").returns(false).check();
+ assertQuery("SELECT SOME(a < b) FROM
test_a_b_s").returns(true).check();
+ assertQuery("SELECT EVERY(a <= b) FROM
test_a_b_s").returns(true).check();
+ assertQuery("SELECT SOME(a > b) FROM
test_a_b_s").returns(false).check();
}
@Test
@@ -390,199 +378,188 @@ public class ItAggregatesTest extends
ClusterPerClassIntegrationTest {
@MethodSource("provideRules")
@WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
public void testDifferentAgg(String[] rules) {
- try {
- sql("CREATE TABLE testMe (a INTEGER, b INTEGER, s VARCHAR);");
- sql("INSERT INTO testMe VALUES (11, 1, 'hello'), (12, 2, 'world'),
(11, 3, NULL)");
- sql("INSERT INTO testMe VALUES (11, 3, 'hello'), (12, 2, 'world'),
(10, 5, 'ahello'), (13, 6, 'world')");
-
- assertQuery("SELECT DISTINCT(a) as a FROM testMe ORDER BY a")
- .disableRules(rules)
- .returns(10)
- .returns(11)
- .returns(12)
- .returns(13)
- .check();
-
- assertQuery("SELECT COUNT(*) FROM testMe")
- .disableRules(rules)
- .returns(7L)
- .check();
+ assertQuery("SELECT DISTINCT(a) as a FROM test_a_b_s ORDER BY a")
+ .disableRules(rules)
+ .returns(10)
+ .returns(11)
+ .returns(12)
+ .returns(13)
+ .check();
- // Such kind of queries can`t be processed with
- if (Arrays.stream(rules).anyMatch(r ->
r.contains("MapReduceSortAggregateConverterRule"))) {
- assertQuery("SELECT COUNT(a), COUNT(DISTINCT(b)) FROM testMe")
- .disableRules(rules)
- .returns(7L, 5L)
- .check();
- }
+ assertQuery("SELECT COUNT(*) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(7L)
+ .check();
- assertQuery("SELECT COUNT(a) as a, s FROM testMe GROUP BY s ORDER
BY a, s")
+ // Such kind of queries can`t be processed with
+ if (Arrays.stream(rules).anyMatch(r ->
r.contains("MapReduceSortAggregateConverterRule"))) {
+ assertQuery("SELECT COUNT(a), COUNT(DISTINCT(b)) FROM test_a_b_s")
.disableRules(rules)
- .returns(1L, "ahello")
- .returns(1L, null)
- .returns(2L, "hello")
- .returns(3L, "world")
+ .returns(7L, 5L)
.check();
+ }
- if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
- assertQuery("SELECT COUNT(a) as a, AVG(a) as b, MIN(a),
MIN(b), s FROM testMe GROUP BY s ORDER BY a, b")
- .disableRules(rules)
- .returns(1L, 10, 10, 5, "ahello")
- .returns(1L, 11, 11, 3, null)
- .returns(2L, 11, 11, 1, "hello")
- .returns(3L, 12, 12, 2, "world")
- .check();
-
- assertQuery("SELECT COUNT(a) as a, AVG(a) as bb, MIN(a),
MIN(b), s FROM testMe GROUP BY s, b ORDER BY a, s")
- .disableRules(rules)
- .returns(1L, 10, 10, 5, "ahello")
- .returns(1L, 11, 11, 1, "hello")
- .returns(1L, 11, 11, 3, "hello")
- .returns(1L, 13, 13, 6, "world")
- .returns(1L, 11, 11, 3, null)
- .returns(2L, 12, 12, 2, "world")
- .check();
- }
-
+ assertQuery("SELECT COUNT(a) as a, s FROM test_a_b_s GROUP BY s ORDER
BY a, s")
+ .disableRules(rules)
+ .returns(1L, "ahello")
+ .returns(1L, null)
+ .returns(2L, "hello")
+ .returns(3L, "world")
+ .check();
- assertQuery("SELECT COUNT(a) FROM testMe")
+ if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
+ assertQuery("SELECT COUNT(a) as a, AVG(a) as b, MIN(a), MIN(b), s
FROM test_a_b_s GROUP BY s ORDER BY a, b")
.disableRules(rules)
- .returns(7L)
+ .returns(1L, 10, 10, 5, "ahello")
+ .returns(1L, 11, 11, 3, null)
+ .returns(2L, 11, 11, 1, "hello")
+ .returns(3L, 12, 12, 2, "world")
.check();
- assertQuery("SELECT COUNT(DISTINCT(a)) FROM testMe")
+ assertQuery("SELECT COUNT(a) as a, AVG(a) as bb, MIN(a), MIN(b), s
FROM test_a_b_s GROUP BY s, b ORDER BY a, s")
.disableRules(rules)
- .returns(4L)
+ .returns(1L, 10, 10, 5, "ahello")
+ .returns(1L, 11, 11, 1, "hello")
+ .returns(1L, 11, 11, 3, "hello")
+ .returns(1L, 13, 13, 6, "world")
+ .returns(1L, 11, 11, 3, null)
+ .returns(2L, 12, 12, 2, "world")
.check();
+ }
- assertQuery("SELECT COUNT(a), COUNT(s), COUNT(*) FROM testMe")
- .disableRules(rules)
- .returns(7L, 6L, 7L)
- .check();
- if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
- assertQuery("SELECT AVG(a) FROM testMe")
- .disableRules(rules)
- .returns(11)
- .check();
- }
+ assertQuery("SELECT COUNT(a) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(7L)
+ .check();
- assertQuery("SELECT MIN(a) FROM testMe")
- .disableRules(rules)
- .returns(10)
- .check();
+ assertQuery("SELECT COUNT(DISTINCT(a)) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(4L)
+ .check();
- assertQuery("SELECT COUNT(a), COUNT(DISTINCT(a)) FROM testMe")
- .disableRules(rules)
- .returns(7L, 4L)
- .check();
+ assertQuery("SELECT COUNT(a), COUNT(s), COUNT(*) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(7L, 6L, 7L)
+ .check();
- assertQuery("SELECT COUNT(a), COUNT(DISTINCT a), SUM(a),
SUM(DISTINCT a) FROM testMe")
+ if (Arrays.stream(rules).noneMatch(MAP_REDUCE_RULES::contains)) {
+ assertQuery("SELECT AVG(a) FROM test_a_b_s")
.disableRules(rules)
- .returns(7L, 4L, 80L, 46L)
+ .returns(11)
.check();
- } finally {
- sql("DROP TABLE IF EXISTS testMe");
}
+
+ assertQuery("SELECT MIN(a) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(10)
+ .check();
+
+ assertQuery("SELECT COUNT(a), COUNT(DISTINCT(a)) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(7L, 4L)
+ .check();
+
+ assertQuery("SELECT COUNT(a), COUNT(DISTINCT a), SUM(a), SUM(DISTINCT
a) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(7L, 4L, 80L, 46L)
+ .check();
}
@ParameterizedTest
@MethodSource("provideRules")
- @WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
public void checkEmptyTable(String[] rules) {
- sql("CREATE TABLE t (a INTEGER, b INTEGER)");
+ sql("DELETE FROM test_a_b_s");
- try {
- assertQuery("SELECT min(b) FROM t GROUP BY a")
- .disableRules(rules)
- .returnNothing().check();
- } finally {
- sql("DROP TABLE t");
- }
+ assertQuery("SELECT min(b) FROM test_a_b_s GROUP BY a")
+ .disableRules(rules)
+ .returnNothing().check();
}
@ParameterizedTest
@MethodSource("rulesForGroupingSets")
public void testGroupingSets(String[] rules) {
- try {
- sql("CREATE TABLE test1 (id INTEGER PRIMARY KEY, str_col VARCHAR,
int_col INTEGER);");
- sql("INSERT INTO test1 VALUES (1, 's1', 10)");
- sql("INSERT INTO test1 VALUES (2, 's1', 20)");
- sql("INSERT INTO test1 VALUES (3, 's2', 10)");
- sql("INSERT INTO test1 VALUES (4, 's3', 40)");
-
- assertQuery("SELECT str_col, SUM(int_col), COUNT(str_col) FROM
test1 GROUP BY GROUPING SETS "
- + "( (str_col, int_col), (str_col), (int_col), () ) HAVING
SUM(int_col) > 0")
- .disableRules(rules)
- .returns(null, 80L, 4L)
- .returns("s1", 10L, 1L)
- .returns("s3", 40L, 1L)
- .returns("s1", 20L, 1L)
- .returns("s2", 10L, 1L)
- .returns("s2", 10L, 1L)
- .returns("s3", 40L, 1L)
- .returns("s1", 30L, 2L)
- .returns(null, 40L, 1L)
- .returns(null, 20L, 2L)
- .returns(null, 20L, 1L)
- .check();
+ sql("DELETE FROM test_str_int_real_dec");
- } finally {
- sql("DROP TABLE test1");
- }
+ sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(1, 's1', 10)");
+ sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(2, 's1', 20)");
+ sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(3, 's2', 10)");
+ sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(4, 's3', 40)");
+
+ assertQuery("SELECT str_col, SUM(int_col), COUNT(str_col) FROM
test_str_int_real_dec GROUP BY GROUPING SETS "
+ + "( (str_col, int_col), (str_col), (int_col), () ) HAVING
SUM(int_col) > 0")
+ .disableRules(rules)
+ .returns(null, 80L, 4L)
+ .returns("s1", 10L, 1L)
+ .returns("s3", 40L, 1L)
+ .returns("s1", 20L, 1L)
+ .returns("s2", 10L, 1L)
+ .returns("s2", 10L, 1L)
+ .returns("s3", 40L, 1L)
+ .returns("s1", 30L, 2L)
+ .returns(null, 40L, 1L)
+ .returns(null, 20L, 2L)
+ .returns(null, 20L, 1L)
+ .check();
}
@ParameterizedTest
@MethodSource("rulesForGroupingSets")
public void testDuplicateGroupingSets(String[] rules) {
- try {
- sql("CREATE TABLE test1 (id INTEGER PRIMARY KEY, str_col VARCHAR,
int_col INTEGER);");
- sql("INSERT INTO test1 VALUES (1, 's1', 10)");
- sql("INSERT INTO test1 VALUES (2, 's1', 20)");
- sql("INSERT INTO test1 VALUES (3, 's2', 10)");
+ sql("DELETE FROM test_str_int_real_dec");
- assertQuery("SELECT str_col FROM test1 GROUP BY GROUPING SETS
((str_col), (), (str_col), ()) ORDER BY str_col")
- .disableRules(rules)
- .returns("s1")
- .returns("s2")
- .returns(null)
- .returns("s1")
- .returns("s2")
- .returns(null)
- .check();
- } finally {
- sql("DROP TABLE test1");
- }
+ sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(1, 's1', 10)");
+ sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(2, 's1', 20)");
+ sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(3, 's2', 10)");
+
+ assertQuery("SELECT str_col FROM test_str_int_real_dec GROUP BY
GROUPING SETS ((str_col), (), (str_col), ()) ORDER BY str_col")
+ .disableRules(rules)
+ .returns("s1")
+ .returns("s2")
+ .returns(null)
+ .returns("s1")
+ .returns("s2")
+ .returns(null)
+ .check();
}
@ParameterizedTest
@MethodSource("provideRules")
public void testAvgOnEmptyGroup(String[] rules) {
+ sql("DELETE FROM test_str_int_real_dec");
+
// TODO https://issues.apache.org/jira/browse/IGNITE-20009
// Remove after is fixed.
Assumptions.assumeFalse(Arrays.stream(rules)
.filter(COLO_RULES::contains).count() == COLO_RULES.size(),
"AVG is disabled for MAP/REDUCE");
- try {
- sql("CREATE TABLE test1 (id INTEGER PRIMARY KEY, str_col VARCHAR,
int_col INTEGER, real_col REAL, dec_col DECIMAL);");
+ assertQuery("SELECT AVG(int_col) FROM test_str_int_real_dec")
+ .disableRules(rules)
+ .returns(new Object[]{null})
+ .check();
- assertQuery("SELECT AVG(int_col) FROM test1")
- .disableRules(rules)
- .returns(new Object[]{null})
- .check();
+ assertQuery("SELECT AVG(real_col) FROM test_str_int_real_dec")
+ .disableRules(rules)
+ .returns(new Object[]{null})
+ .check();
- assertQuery("SELECT AVG(real_col) FROM test1")
- .disableRules(rules)
- .returns(new Object[]{null})
- .check();
+ assertQuery("SELECT AVG(dec_col) FROM test_str_int_real_dec")
+ .disableRules(rules)
+ .returns(new Object[]{null})
+ .check();
+ }
- assertQuery("SELECT AVG(dec_col) FROM test1")
- .disableRules(rules)
- .returns(new Object[]{null})
- .check();
- } finally {
- sql("DROP TABLE test1");
- }
+ @ParameterizedTest
+ @MethodSource("provideRules")
+ public void testAggDistinctGroupSet(String[] rules) {
+ sql("DELETE FROM test_a_b_s");
+
+ sql("INSERT INTO test_a_b_s (id, a, b) VALUES (1, 11, 2), (2, 12, 2),
(3, 12, 3)");
+
+ assertQuery("SELECT COUNT(a), COUNT(DISTINCT(b)) FROM test_a_b_s")
+ .disableRules(rules)
+ .returns(3L, 2L)
+ .check();
}
private static Stream<Arguments> rulesForGroupingSets() {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
index d2cab69c0c..0e7ef67d2e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteMapSortAggregate.java
@@ -90,7 +90,7 @@ public class IgniteMapSortAggregate extends
IgniteMapAggregateBase implements Ig
List<ImmutableBitSet> groupSets,
List<AggregateCall> aggCalls) {
return new IgniteMapSortAggregate(
- getCluster(), traitSet, input, groupSet, groupSets, aggCalls,
TraitUtils.collation(traitSet));
+ getCluster(), traitSet, input, groupSet, groupSets, aggCalls,
this.collation);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
index eaac81c14a..d763c49b95 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteSortAggregateBase.java
@@ -29,12 +29,12 @@ import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.trait.TraitsAwareIgniteRel;
/**
- * IgniteSortAggregateBase interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Defines common methods for {@link IgniteRel relational nodes} that
implement sort-base aggregates.
*/
interface IgniteSortAggregateBase extends TraitsAwareIgniteRel {
/**
@@ -44,7 +44,34 @@ interface IgniteSortAggregateBase extends
TraitsAwareIgniteRel {
*/
ImmutableBitSet getGroupSet();
- /** {@inheritDoc} */
+ /**
+ * Performs propagation of {@link RelCollation collation trait}.
+ *
+ * <p>Sorted aggregate operate on already sorted input to perform
aggregation/grouping.
+ * Because of that such operator produces rows sorted by grouping columns,
and it can produce
+ * results that may satisfy required collation trait (ordering).
+ *
+ * <p>If a grouping set contains all required ordering columns, then
inputs should provide
+ * ordering that includes all required ordering columns + the rest of the
columns used in the grouping set:
+
+ * <pre>
+ * GROUP BY a, b, c ORDER BY a, b
+ * Input collation: (a, b, c)
+ * </pre>
+ *
+ * <p>Otherwise this operator is unable to fully satisfy required ordering
and we require collation trait based
+ * columns from the grouping set, and the rest of the requirements is
going to be satisfied by enforcer operator:
+ * <pre>
+ * GROUP BY a, b ORDER BY a, b, c
+ * Input collation (a, b)
+ * Enforcer: adds ordering by c
+ * </pre>
+ *
+ * @param nodeTraits Required relational node output traits.
+ * @param inputTraits Traits of input nodes.
+ *
+ * @return Traits satisfied by this expression and traits that input nodes
should satisfy.
+ */
@Override
default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
RelTraitSet nodeTraits, List<RelTraitSet> inputTraits
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
index d539aaacd0..53d4ad0da3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
@@ -39,6 +40,8 @@ import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -86,9 +89,15 @@ public class MapReduceAggregates {
}
/**
- * Creates a physical operator that implements the given logical aggregate
as MAP/reduce.
+ * Creates a physical operator that implements the given logical aggregate
as MAP/REDUCE.
+ *
+ * @param agg Logical aggregate expression.
+ * @param builder Builder to create implementations of MAP and REDUCE
phases.
+ * @param fieldMappingOnReduce Mapping to be applied to group sets on
REDUCE phase.
+ *
+ * @return A physical node tree that implements the given logical operator.
*/
- public static IgniteRel buildAggregates(LogicalAggregate agg,
AggregateRelBuilder builder) {
+ public static IgniteRel buildAggregates(LogicalAggregate agg,
AggregateRelBuilder builder, Mapping fieldMappingOnReduce) {
//
// To implement MAP/REDUCE aggregate LogicalAggregate is transformed
into
@@ -189,11 +198,17 @@ public class MapReduceAggregates {
assert mapAggCalls.size() <= reduceAggCalls.size() :
format("The number of MAP/REDUCE aggregates is not correct.
MAP: {}\nREDUCE: {}", mapAggCalls, reduceAggCalls);
+ // Apply mapping to groupSet/groupSets on REDUCE phase.
+ ImmutableBitSet groupSetOnReduce =
Mappings.apply(fieldMappingOnReduce, agg.getGroupSet());
+ List<ImmutableBitSet> groupSetsOnReduce = agg.getGroupSets().stream()
+ .map(g -> Mappings.apply(fieldMappingOnReduce, g))
+ .collect(Collectors.toList());
+
IgniteRel reduce = builder.makeReduceAgg(
agg.getCluster(),
map,
- agg.getGroupSet(),
- agg.getGroupSets(),
+ groupSetOnReduce,
+ groupSetsOnReduce,
reduceAggCalls,
reduceTypeToUse
);
@@ -252,7 +267,7 @@ public class MapReduceAggregates {
}
/**
- * Used by {@link #buildAggregates(LogicalAggregate, AggregateRelBuilder)}
+ * Used by {@link #buildAggregates(LogicalAggregate, AggregateRelBuilder,
Mapping)}.
* to create MAP/REDUCE aggregate nodes.
*/
public interface AggregateRelBuilder {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
index d6bb88e10a..b14010d030 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.
import static
org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
@@ -44,7 +43,6 @@ import
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.Aggrega
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.HintUtils;
-import org.apache.ignite.internal.sql.engine.util.PlanUtils;
/**
* Planner rule that recognizes a {@link org.apache.calcite.rel.core.Aggregate}
@@ -106,6 +104,7 @@ public class HashAggregateConverterRule {
RelOptCluster cluster = agg.getCluster();
RelTraitSet inTrait =
cluster.traitSetOf(IgniteConvention.INSTANCE);
RelTraitSet outTrait =
cluster.traitSetOf(IgniteConvention.INSTANCE);
+ Mapping fieldMappingOnReduce =
Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet());
AggregateRelBuilder relBuilder = new AggregateRelBuilder() {
@Override
@@ -115,8 +114,8 @@ public class HashAggregateConverterRule {
cluster,
outTrait.replace(IgniteDistributions.random()),
input,
- agg.getGroupSet(),
- agg.getGroupSets(),
+ groupSet,
+ groupSets,
aggregateCalls
);
}
@@ -125,26 +124,19 @@ public class HashAggregateConverterRule {
public IgniteRel makeReduceAgg(RelOptCluster cluster, RelNode
map, ImmutableBitSet groupSet,
List<ImmutableBitSet> groupSets, List<AggregateCall>
aggregateCalls, RelDataType outputType) {
- Mapping mapping =
PlanUtils.computeAggFieldMapping(groupSets);
-
- ImmutableBitSet groupSet1 = Commons.mapBitSet(groupSet,
mapping);
- List<ImmutableBitSet> groupSets1 = groupSets.stream()
- .map(g -> Commons.mapBitSet(g, mapping))
- .collect(Collectors.toList());
-
return new IgniteReduceHashAggregate(
cluster,
outTrait.replace(IgniteDistributions.single()),
convert(map,
inTrait.replace(IgniteDistributions.single())),
- groupSet1,
- groupSets1,
+ groupSet,
+ groupSets,
aggregateCalls,
outputType
);
}
};
- return MapReduceAggregates.buildAggregates(agg, relBuilder);
+ return MapReduceAggregates.buildAggregates(agg, relBuilder,
fieldMappingOnReduce);
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
index d76df11d2a..9a636c39af 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
@@ -43,6 +44,7 @@ import
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
import
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.HintUtils;
import org.jetbrains.annotations.Nullable;
@@ -118,8 +120,14 @@ public class SortAggregateConverterRule {
RelOptCluster cluster = agg.getCluster();
RelCollation collation =
TraitUtils.createCollation(agg.getGroupSet().asList());
- RelTraitSet inTrait =
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
- RelTraitSet outTrait =
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
+ // Create mapping to adjust fields on REDUCE phase.
+ Mapping fieldMappingOnReduce =
Commons.trimmingMapping(agg.getGroupSet().length(), agg.getGroupSet());
+
+ // Adjust columns in output collation.
+ RelCollation outputCollation =
collation.apply(fieldMappingOnReduce);
+
+ RelTraitSet inTraits =
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(collation);
+ RelTraitSet outTraits =
cluster.traitSetOf(IgniteConvention.INSTANCE).replace(outputCollation);
AggregateRelBuilder relBuilder = new AggregateRelBuilder() {
@Override
@@ -128,10 +136,10 @@ public class SortAggregateConverterRule {
return new IgniteMapSortAggregate(
cluster,
- outTrait.replace(IgniteDistributions.random()),
- convert(input,
inTrait.replace(IgniteDistributions.random())),
- agg.getGroupSet(),
- agg.getGroupSets(),
+ outTraits.replace(IgniteDistributions.random()),
+ convert(input,
inTraits.replace(IgniteDistributions.random())),
+ groupSet,
+ groupSets,
aggregateCalls,
collation
);
@@ -143,18 +151,18 @@ public class SortAggregateConverterRule {
return new IgniteReduceSortAggregate(
cluster,
- outTrait.replace(IgniteDistributions.single()),
- convert(map,
inTrait.replace(IgniteDistributions.single())),
- agg.getGroupSet(),
- agg.getGroupSets(),
+ outTraits.replace(IgniteDistributions.single()),
+ convert(map,
outTraits.replace(IgniteDistributions.single())),
+ groupSet,
+ groupSets,
aggregateCalls,
outputType,
- collation
+ outputCollation
);
}
};
- return MapReduceAggregates.buildAggregates(agg, relBuilder);
+ return MapReduceAggregates.buildAggregates(agg, relBuilder,
fieldMappingOnReduce);
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
index 2c58e626fc..6174fa4ebc 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/logical/LogicalOrToUnionRule.java
@@ -148,7 +148,7 @@ public class LogicalOrToUnionRule extends
RelRule<LogicalOrToUnionRule.Config> {
}
Mappings.TargetMapping mapping = scan.requiredColumns() == null ? null
:
- Commons.inverseTrimmingMapping(fieldCnt,
scan.requiredColumns());
+ Commons.trimmingMapping(fieldCnt, scan.requiredColumns());
for (RexNode op : operands) {
BitSet conditionFields = new BitSet(fieldCnt);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
index 2b4ce030a9..23811d7be9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitsAwareIgniteRel.java
@@ -87,7 +87,7 @@ public interface TraitsAwareIgniteRel extends IgniteRel {
*
* @param nodeTraits Relational node output traits.
* @param inTraits Relational node input traits.
- * @return List of possible input-output traits combinations.
+ * @return Traits satisfied by this expression and traits that input nodes
should satisfy.
*/
default Pair<RelTraitSet, List<RelTraitSet>>
passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inTraits) {
if (inTraits.size() > 1) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 91f5984eb9..1bf5f3a86c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -454,6 +454,8 @@ public final class Commons {
*
* <p>To find a new index of element after trimming call {@code
mapping.getTargetOpt(index)}.
*
+ * <p>To find an old index of element before trimming call {@code
mapping.getSourceOpt(index)}.
+ *
* <p>This mapping can be used to adjust traits or aggregations, for
example, when several fields have been truncated.
* Assume the following scenario:
* <pre>
@@ -477,62 +479,14 @@ public final class Commons {
* @see org.apache.calcite.plan.RelTrait#apply(TargetMapping)
* @see org.apache.calcite.rel.core.AggregateCall#transform(TargetMapping)
*/
- public static Mappings.TargetMapping trimmingMapping(int sourceSize,
ImmutableBitSet requiredElements) {
- Mapping mapping = Mappings.create(MappingType.PARTIAL_FUNCTION,
sourceSize, requiredElements.cardinality());
+ public static Mapping trimmingMapping(int sourceSize, ImmutableBitSet
requiredElements) {
+ Mapping mapping = Mappings.create(MappingType.INVERSE_SURJECTION,
sourceSize, requiredElements.cardinality());
for (Ord<Integer> ord : Ord.zip(requiredElements)) {
mapping.set(ord.e, ord.i);
}
return mapping;
}
- /**
- * Create a mapping to redo trimming made by {@link #trimmingMapping(int,
ImmutableBitSet)}.
- *
- * <p>To find an old index of element before trimming call {@code
mapping.getSourceOpt(index)}.
- *
- * <p>This mapping can be used to remap traits or aggregates back as if
the trimming has never happened.
- *
- * @param sourceSize Count of elements in a non trimmed collection.
- * @param requiredElements Elements which were preserved during trimming.
- * @return A mapping to restore the original mapping.
- * @see #trimmingMapping(int, ImmutableBitSet)
- */
- public static Mappings.TargetMapping inverseTrimmingMapping(int
sourceSize, ImmutableBitSet requiredElements) {
- return Mappings.invert(trimmingMapping(sourceSize,
requiredElements).inverse());
- }
-
- /**
- * Produces new bitset setting bits according to the given mapping.
- *
- * <pre>
- * bitset:
- * [0, 1, 4]
- * mapping:
- * 1 -> 0
- * 0 -> 1
- * 4 -> 3
- * result:
- * [0, 1, 3]
- * </pre>
- *
- * @param bitset A bitset.
- * @param mapping Mapping to use.
- * @return a transformed bit set.
- */
- public static ImmutableBitSet mapBitSet(ImmutableBitSet bitset, Mapping
mapping) {
- ImmutableBitSet.Builder result = ImmutableBitSet.builder();
-
- int bitPos = bitset.nextSetBit(0);
-
- while (bitPos != -1) {
- int target = mapping.getTarget(bitPos);
- result.set(target);
- bitPos = bitset.nextSetBit(bitPos + 1);
- }
-
- return result.build();
- }
-
/**
* Checks if there is a such permutation of all {@code elems} that is
prefix of provided {@code seq}.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
index ffae97bf89..24e5e49602 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine.util;
-import java.util.BitSet;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
@@ -31,8 +30,6 @@ import org.apache.calcite.sql.fun.SqlSumAggFunction;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.Mapping;
-import org.apache.calcite.util.mapping.MappingType;
-import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulator;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -107,7 +104,8 @@ public class PlanUtils {
public static RelDataType createHashAggRowType(List<ImmutableBitSet>
groupSets,
IgniteTypeFactory typeFactory, RelDataType inputType,
List<AggregateCall> aggregateCalls) {
- Mapping mapping = computeAggFieldMapping(groupSets);
+ ImmutableBitSet fieldIndices = ImmutableBitSet.union(groupSets);
+ Mapping mapping = Commons.trimmingMapping(fieldIndices.length(),
fieldIndices);
RelDataTypeFactory.Builder builder = typeFactory.builder();
@@ -124,40 +122,6 @@ public class PlanUtils {
return builder.build();
}
- /**
- * Creates grouping set keys mapping for REDUCE phase of MAP/REDUCE
aggregates.
- * <pre>
- * group sets:
- * [0], [2, 0], [3]
- * mapping:
- * 0 -> 0
- * 2 -> 1
- * 3 -> 2
- * </pre>
- */
- public static Mapping computeAggFieldMapping(List<ImmutableBitSet>
groupingSets) {
- BitSet fieldIndices = new BitSet();
-
- for (ImmutableBitSet groupingSet : groupingSets) {
- for (int field : groupingSet) {
- fieldIndices.set(field);
- }
- }
-
- Mapping mapping = Mappings.create(MappingType.INVERSE_SURJECTION,
fieldIndices.length(), fieldIndices.cardinality());
-
- int i = 0;
- int bitPos = fieldIndices.nextSetBit(0);
-
- while (bitPos != -1) {
- mapping.set(bitPos, i);
- bitPos = fieldIndices.nextSetBit(bitPos + 1);
- i++;
- }
-
- return mapping;
- }
-
private static void addAccumulatorFields(IgniteTypeFactory typeFactory,
List<AggregateCall> aggregateCalls, Builder builder) {
Accumulators accumulators = new Accumulators(typeFactory);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
index 606f7d7dc0..d65185813e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
@@ -260,7 +260,7 @@ public class RexUtils {
Mappings.TargetMapping mapping = null;
if (requiredColumns != null) {
- mapping = Commons.inverseTrimmingMapping(types.size(),
requiredColumns);
+ mapping = Commons.trimmingMapping(types.size(), requiredColumns);
}
List<SearchBounds> bounds = Arrays.asList(new
SearchBounds[collation.getFieldCollations().size()]);
@@ -337,7 +337,7 @@ public class RexUtils {
Mappings.TargetMapping toTrimmedRowMapping = null;
if (requiredColumns != null) {
- toTrimmedRowMapping = Commons.inverseTrimmingMapping(types.size(),
requiredColumns);
+ toTrimmedRowMapping = Commons.trimmingMapping(types.size(),
requiredColumns);
}
List<RelFieldCollation> fieldCollations =
collation.getFieldCollations();
@@ -392,7 +392,7 @@ public class RexUtils {
Mappings.TargetMapping mapping = null;
if (requiredColumns != null) {
- mapping = Commons.inverseTrimmingMapping(types.size(),
requiredColumns);
+ mapping = Commons.trimmingMapping(types.size(), requiredColumns);
}
for (Entry<List<RexCall>> fld :
fieldsToPredicates.int2ObjectEntrySet()) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
index 3ee52dd67a..daaa6e7f25 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateExecutionTest.java
@@ -37,7 +37,7 @@ import
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
import
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.MapReduceAgg;
-import org.apache.ignite.internal.sql.engine.util.PlanUtils;
+import org.apache.ignite.internal.sql.engine.util.Commons;
/**
* HashAggregateExecutionTest.
@@ -116,7 +116,8 @@ public class HashAggregateExecutionTest extends
BaseAggregateTest {
aggMap.register(scan);
- Mapping reduceMapping = PlanUtils.computeAggFieldMapping(grpSets);
+ ImmutableBitSet grpSet = grpSets.get(0);
+ Mapping reduceMapping = Commons.trimmingMapping(grpSet.length(),
grpSet);
MapReduceAgg mapReduceAgg =
MapReduceAggregates.createMapReduceAggCall(call,
reduceMapping.getTargetCount());
HashAggregateNode<Object[]> aggRdc = new HashAggregateNode<>(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
index 7040cd02a3..f40ccda15c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateExecutionTest.java
@@ -37,7 +37,7 @@ import
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
import
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.MapReduceAgg;
-import org.apache.ignite.internal.sql.engine.util.PlanUtils;
+import org.apache.ignite.internal.sql.engine.util.Commons;
/**
* SortAggregateExecutionTest.
@@ -124,7 +124,6 @@ public class SortAggregateExecutionTest extends
BaseAggregateTest {
// The group's fields placed on the begin of the output row (planner
// does this by Projection node for aggregate input).
- // Hash aggregate doesn't use groups set on reducer because send
GroupKey as object.
ImmutableIntList reduceGrpFields = ImmutableIntList.copyOf(
IntStream.range(0,
grpSet.cardinality()).boxed().collect(Collectors.toList())
);
@@ -137,8 +136,8 @@ public class SortAggregateExecutionTest extends
BaseAggregateTest {
rdcCmp = (k1, k2) -> 0;
}
- Mapping reduceMapping = PlanUtils.computeAggFieldMapping(grpSets);
- MapReduceAgg mapReduceAgg =
MapReduceAggregates.createMapReduceAggCall(call,
reduceMapping.getTargetCount());
+ Mapping mapping = Commons.trimmingMapping(grpSet.length(), grpSet);
+ MapReduceAgg mapReduceAgg =
MapReduceAggregates.createMapReduceAggCall(call, mapping.getTargetCount());
SortAggregateNode<Object[]> aggRdc = new SortAggregateNode<>(
ctx,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
index f268ac7e49..09e176efe0 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
@@ -29,8 +29,10 @@ import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.schema.NativeTypes;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
@@ -39,6 +41,7 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -847,6 +850,48 @@ public abstract class AbstractAggregatePlannerTest extends
AbstractPlannerTest {
* <p>Distribution identity(1)
*/
CASE_23C("SELECT val0, AVG(val1) FROM test GROUP BY val0",
schema(identity(1))),
+
+ /**
+ * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+ *
+ * <p>Distribution single()
+ */
+ CASE_24_1("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test",
schema(single())),
+
+ /**
+ * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+ *
+ * <p>Distribution hash(0)
+ */
+ CASE_24_1A("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test",
schema(hash(0))),
+
+ /**
+ * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+ *
+ * <p>Distribution hash(1)
+ */
+ CASE_24_1B("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test",
schema(hash(1))),
+
+ /**
+ * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+ *
+ * <p>Distribution hash(2)
+ */
+ CASE_24_1C("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test",
schema(hash(2))),
+
+ /**
+ * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+ *
+ * <p>Distribution identity(1)
+ */
+ CASE_24_1D("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test",
schema(identity(1))),
+
+ /**
+ * Query: SELECT COUNT(val0), COUNT(DISTINCT(val1) from test.
+ *
+ * <p>Distribution identity(2)
+ */
+ CASE_24_1E("SELECT COUNT(val0), COUNT(DISTINCT(val1)) from test",
schema(identity(2))),
;
final String query;
@@ -993,4 +1038,31 @@ public abstract class AbstractAggregatePlannerTest
extends AbstractPlannerTest {
return true;
};
}
+
+ <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>>
groupSets, int groupKey) {
+ return (node) -> {
+ List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+ ImmutableBitSet firstGroupSet = allGroupSets.get(0);
+
+ boolean groupSetsMatch =
allGroupSets.equals(List.of(ImmutableBitSet.of(groupKey)));
+ boolean groupSetMatches =
firstGroupSet.equals(ImmutableBitSet.of(groupKey));
+
+ return groupSetMatches && groupSetsMatch;
+ };
+ }
+
+ <T> Predicate<T> hasNoGroupSets(Function<T, List<ImmutableBitSet>>
groupSets) {
+ return (node) -> {
+ List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+ List<ImmutableBitSet> emptyGroupSets =
List.of(ImmutableBitSet.of());
+ return emptyGroupSets.equals(allGroupSets);
+ };
+ }
+
+ <T extends RelNode> Predicate<T> hasCollation(RelCollation expected) {
+ return (node) -> {
+ RelCollation collation = TraitUtils.collation(node);
+ return expected.equals(collation);
+ };
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index 8842af8cfc..11f3d46a9e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
@@ -515,6 +516,45 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
assertPlan(TestCase.CASE_23C, colocatedGroupBy);
}
+ /**
+ * Validates a plan for a query with two aggregates: one w/o DISTINCT and
one with DISTINCT: single distribution.
+ */
+ @Test
+ public void countDistinctGroupSetSingle() throws Exception {
+ assertPlan(TestCase.CASE_24_1,
isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteTableScan.class)
+ )));
+ }
+
+ /**
+ * Validates a plan for a query with two aggregates: one w/o DISTINCT and
one with DISTINCT: hash distribution.
+ */
+ @Test
+ public void countDistinctGroupSetHash() throws Exception {
+ checkCountDistinctHash(TestCase.CASE_24_1A);
+ checkCountDistinctHash(TestCase.CASE_24_1B);
+ checkCountDistinctHash(TestCase.CASE_24_1D);
+
+ Predicate<RelNode> colocated =
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+ .and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(IgniteDistributions.single())
+
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
+
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
+ ))
+ ))
+ )
+ ))
+
+ );
+
+ assertPlan(TestCase.CASE_24_1C, colocated);
+ assertPlan(TestCase.CASE_24_1E, colocated);
+ }
+
private void checkSimpleAggSingle(TestCase testCase) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
@@ -769,4 +809,19 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
))
);
}
+
+ private void checkCountDistinctHash(TestCase testCase) throws Exception {
+ assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+
.and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
+ .and(input(isInstanceOf(IgniteExchange.class)
+
.and(hasDistribution(IgniteDistributions.single())
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+
.and(hasGroupSets(IgniteMapHashAggregate::getGroupSets, 1))
+ ))
+ )
+ ))
+ ))
+ ));
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index 54fe13c0bb..eba053ffab 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -459,6 +460,40 @@ public class ColocatedHashAggregatePlannerTest extends
AbstractAggregatePlannerT
}
+ /**
+ * Validates a plan for a query with two aggregates: one w/o DISTINCT and
one with DISTINCT: single distribution.
+ */
+ @Test
+ public void countDistinctGroupSetSingle() throws Exception {
+ assertPlan(TestCase.CASE_24_1,
isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteTableScan.class)
+ )), disableRules);
+ }
+
+ /**
+ * Validates a plan for a query with two aggregates: one w/o DISTINCT and
one with DISTINCT: hash distribution.
+ */
+ @Test
+ public void countDistinctGroupSetHash() throws Exception {
+ checkCountDistinctHash(TestCase.CASE_24_1A);
+ checkCountDistinctHash(TestCase.CASE_24_1B);
+ checkCountDistinctHash(TestCase.CASE_24_1D);
+
+ Predicate<RelNode> colocated2 =
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(IgniteDistributions.single()))
+
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
+
.and(input(isInstanceOf(IgniteTableScan.class))))
+ ))
+ ));
+
+ assertPlan(TestCase.CASE_24_1C, colocated2, disableRules);
+ assertPlan(TestCase.CASE_24_1E, colocated2, disableRules);
+ }
+
private void checkSimpleAggSingle(TestCase testCase) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
@@ -690,4 +725,13 @@ public class ColocatedHashAggregatePlannerTest extends
AbstractAggregatePlannerT
disableRules
);
}
+
+ private void checkCountDistinctHash(TestCase testCase) throws Exception {
+ assertPlan(testCase, isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(hasNoGroupSets(IgniteColocatedHashAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteTableScan.class))))
+ ), disableRules);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 4c66544ec5..81f586e5f9 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -461,6 +462,29 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(TestCase.CASE_23C, checkPlan, disableRules);
}
+ /**
+ * Validate query with two aggregates: one w/o DISTINCT and one with
DISTINCT.
+ */
+ @Test
+ public void countDistinctGroupSetSingle() throws Exception {
+ assertPlan(TestCase.CASE_24_1,
isInstanceOf(IgniteColocatedSortAggregate.class)
+
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteTableScan.class)
+ )), disableRules);
+ }
+
+ /**
+ * Validate query with two aggregates: one w/o DISTINCT and one with
DISTINCT: hash distribution.
+ */
+ @Test
+ public void countDistinctGroupSetHash() throws Exception {
+ checkCountDistinctHash(TestCase.CASE_24_1A);
+ checkCountDistinctHash(TestCase.CASE_24_1B);
+ checkCountDistinctHash(TestCase.CASE_24_1C);
+ checkCountDistinctHash(TestCase.CASE_24_1D);
+ checkCountDistinctHash(TestCase.CASE_24_1E);
+ }
+
private void checkSimpleAggSingle(TestCase testCase) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
@@ -726,4 +750,13 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
disableRules
);
}
+
+ private void checkCountDistinctHash(TestCase testCase) throws Exception {
+ assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
+
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteTableScan.class))))
+ ), disableRules);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
new file mode 100644
index 0000000000..fc8053ac4c
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteValues;
+import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
+import
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.AggregateRelBuilder;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link MapReduceAggregates}.
+ */
+public class MapReduceAggregatesTest {
+
+ /**
+ * Checks that mapping is applied to {@code groupSet} and {@code
groupSets} arguments to methods of {@link AggregateRelBuilder}.
+ */
+ @Test
+ public void testGroupSetMapping() {
+ IgniteTypeFactory typeFactory = Commons.typeFactory();
+
+ RelDataType rowType = typeFactory.builder().add("f1",
SqlTypeName.INTEGER)
+ .add("f2", SqlTypeName.INTEGER)
+ .build();
+
+ RelOptCluster cluster = Commons.cluster();
+ RelTraitSet traitSet = RelTraitSet.createEmpty();
+
+ // Input to aggregate
+ LogicalValues values = new LogicalValues(cluster, traitSet, rowType,
ImmutableList.of());
+
+ // Aggregate
+ ImmutableBitSet groupSet = ImmutableBitSet.of(1);
+ AggregateCall aggregateCall = newCall(typeFactory, List.of(1));
+
+ LogicalAggregate aggregate = new LogicalAggregate(cluster, traitSet,
List.of(), values,
+ groupSet, List.of(groupSet), List.of(aggregateCall));
+
+ Mapping mapping = Mappings.create(MappingType.PARTIAL_FUNCTION, 2, 5);
+ mapping.set(1, 3);
+
+ GroupSetCollector collect = new GroupSetCollector(rowType);
+
+ MapReduceAggregates.buildAggregates(aggregate, collect, mapping);
+
+ Pair<ImmutableBitSet, List<ImmutableBitSet>> mapGroups =
collect.collectedGroupSets.get(0);
+ Pair<ImmutableBitSet, List<ImmutableBitSet>> reduceGroups =
collect.collectedGroupSets.get(1);
+ ImmutableBitSet mappedGroupSet = Mappings.apply(mapping, groupSet);
+
+ assertEquals(Pair.of(groupSet, List.of(groupSet)), mapGroups, "group
sets on MAP phase");
+ assertEquals(Pair.of(mappedGroupSet, List.of(mappedGroupSet)),
reduceGroups, "group sets on REDUCE phase");
+ }
+
+ private static AggregateCall newCall(IgniteTypeFactory typeFactory,
List<Integer> args) {
+ return AggregateCall.create(SqlStdOperatorTable.COUNT,
+ false, false, false, args, -1, null,
+ RelCollations.EMPTY,
+ typeFactory.createSqlType(SqlTypeName.BIGINT),
+ "count");
+ }
+
+ private static class GroupSetCollector implements AggregateRelBuilder {
+ private final List<Pair<ImmutableBitSet, List<ImmutableBitSet>>>
collectedGroupSets = new ArrayList<>();
+ private final RelDataType rowType;
+
+ GroupSetCollector(RelDataType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public IgniteRel makeMapAgg(RelOptCluster cluster,
+ RelNode input,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggregateCalls) {
+
+ collectedGroupSets.add(Pair.of(groupSet, groupSets));
+
+ return createOutExpr(cluster, input);
+ }
+
+ @Override
+ public IgniteRel makeReduceAgg(RelOptCluster cluster,
+ RelNode map,
+ ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets,
+ List<AggregateCall> aggregateCalls,
+ RelDataType outputType) {
+
+ collectedGroupSets.add(Pair.of(groupSet, groupSets));
+
+ return createOutExpr(cluster, map);
+ }
+
+ private IgniteValues createOutExpr(RelOptCluster cluster, RelNode
input) {
+ // Expression can be of any type and implementation of
MapReduceAggregates should not probe into it.
+ return new IgniteValues(cluster, rowType, ImmutableList.of(),
input.getTraitSet());
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index a525b719eb..392d56843b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.function.Predicate;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
@@ -37,6 +38,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.util.ArrayUtils;
import org.junit.jupiter.api.Test;
@@ -512,6 +514,39 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
}
+ /**
+ * Validate query with two aggregates: one w/o DISTINCT and one with
DISTINCT: hash distribution.
+ */
+ @Test
+ public void countDistinctGroupSetSingle() throws Exception {
+ Predicate<IgniteReduceHashAggregate> inputAgg =
isInstanceOf(IgniteReduceHashAggregate.class)
+ .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
+ .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+ .and(hasGroupSets(Aggregate::getGroupSets, 1))
+ ));
+
+ assertPlan(TestCase.CASE_24_1,
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+
.and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+
.and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets))
+
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+ ))
+ )),
+ disableRules);
+ }
+
+ /**
+ * Validates a plan for a query with two aggregates: one w/o DISTINCT and
one with DISTINCT: single distribution.
+ */
+ @Test
+ public void countDistinctGroupSetHash() throws Exception {
+ checkCountDistinctHash(TestCase.CASE_24_1A);
+ checkCountDistinctHash(TestCase.CASE_24_1B);
+ checkCountDistinctHash(TestCase.CASE_24_1C);
+ checkCountDistinctHash(TestCase.CASE_24_1D);
+ checkCountDistinctHash(TestCase.CASE_24_1E);
+ }
+
private void checkSimpleAggSingle(TestCase testCase) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
@@ -746,4 +781,25 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
disableRules
);
}
+
+
+ private void checkCountDistinctHash(TestCase testCase) throws Exception {
+ Predicate<IgniteReduceHashAggregate> inputAgg =
isInstanceOf(IgniteReduceHashAggregate.class)
+ .and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+ .and(hasGroupSets(Aggregate::getGroupSets, 1))
+ ))
+ ));
+
+ assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+
.and(hasNoGroupSets(IgniteReduceHashAggregate::getGroupSets))
+ .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+
.and(hasNoGroupSets(IgniteMapHashAggregate::getGroupSets))
+
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+ ))
+ )),
+ disableRules);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index a02267ab11..e39bc01c44 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -26,7 +26,9 @@ import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
@@ -37,6 +39,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.util.ArrayUtils;
import org.junit.jupiter.api.Test;
@@ -494,6 +497,46 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
}
+ /**
+ * Validates a plan for a query with two aggregates: one w/o DISTINCT and
one with DISTINCT: single distribution.
+ */
+ @Test
+ public void countDistinctGroupSetSingle() throws Exception {
+ Predicate<IgniteReduceSortAggregate> inputAgg =
isInstanceOf(IgniteReduceSortAggregate.class)
+ .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
+ .and(hasCollation(RelCollations.of(0)))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasCollation(RelCollations.of(0)))
+ .and(hasDistribution(IgniteDistributions.single()))
+ .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+ .and(hasCollation(RelCollations.of(1)))
+ .and(hasGroupSets(Aggregate::getGroupSets, 1))
+ ))));
+
+ assertPlan(TestCase.CASE_24_1,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+
.and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+ .and(hasCollation(RelCollations.EMPTY))
+ .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
+ .and(hasCollation(RelCollations.EMPTY))
+
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+ ))
+ )),
+ disableRules);
+ }
+
+ /**
+ * Validates a plan for a query with two aggregates: one w/o DISTINCT and
one with DISTINCT: hash distribution.
+ */
+ @Test
+ public void countDistinctGroupSetHash() throws Exception {
+ checkCountDistinctHash(TestCase.CASE_24_1A);
+ checkCountDistinctHash(TestCase.CASE_24_1B);
+ checkCountDistinctHash(TestCase.CASE_24_1C);
+ checkCountDistinctHash(TestCase.CASE_24_1D);
+ checkCountDistinctHash(TestCase.CASE_24_1E);
+ }
+
private void checkSimpleAggSingle(TestCase testCase) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
@@ -821,4 +864,28 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
disableRules
);
}
+
+ private void checkCountDistinctHash(TestCase testCase) throws Exception {
+ Predicate<IgniteReduceSortAggregate> inputAgg =
isInstanceOf(IgniteReduceSortAggregate.class)
+ .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
+ .and(hasCollation(RelCollations.of(0)))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(IgniteDistributions.single()))
+ .and(hasCollation(RelCollations.of(0)))
+ .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+ .and(hasGroupSets(Aggregate::getGroupSets, 1))
+ ))
+ ));
+
+ assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+
.and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+ .and(hasCollation(RelCollations.EMPTY))
+ .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
+ .and(hasCollation(RelCollations.EMPTY))
+
.and(input(isInstanceOf(IgniteProject.class).and(input(inputAgg)))
+ ))
+ )),
+ disableRules);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
index 5c8de83374..344e94e021 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
@@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.Mapping;
-import org.apache.calcite.util.mapping.MappingType;
import org.apache.calcite.util.mapping.Mappings;
import org.junit.jupiter.api.Test;
@@ -31,24 +30,19 @@ import org.junit.jupiter.api.Test;
public class CommonsTest {
@Test
- public void testMapBitSet() {
- Mapping mapping = Mappings.create(MappingType.PARTIAL_FUNCTION, 7, 7);
- mapping.set(0, 1);
- mapping.set(4, 6);
- mapping.set(1, 4);
+ public void testTrimmingMapping() {
+ ImmutableBitSet requiredElements = ImmutableBitSet.of(1, 4, 3, 7);
- ImmutableBitSet bitSet = ImmutableBitSet.of(1, 0, 4);
-
- ImmutableBitSet actual = Commons.mapBitSet(bitSet, mapping);
- assertEquals(ImmutableBitSet.of(1, 4, 6), actual);
+ Mapping mapping = Commons.trimmingMapping(requiredElements.length(),
requiredElements);
+ expectMapped(mapping, ImmutableBitSet.of(1, 7), ImmutableBitSet.of(0,
3));
+ expectMapped(mapping, ImmutableBitSet.of(3), ImmutableBitSet.of(1));
+ expectMapped(mapping, ImmutableBitSet.of(1, 4, 3, 7),
ImmutableBitSet.of(0, 1, 2, 3));
}
- @Test
- public void testMapEmptyBitSet() {
- Mapping mapping = Mappings.createIdentity(1);
- ImmutableBitSet bitSet = ImmutableBitSet.of();
+ private static void expectMapped(Mapping mapping, ImmutableBitSet bitSet,
ImmutableBitSet expected) {
+ assertEquals(expected, Mappings.apply(mapping, bitSet), "direct
mapping");
- ImmutableBitSet actual = Commons.mapBitSet(bitSet, mapping);
- assertEquals(ImmutableBitSet.of(), actual);
+ Mapping inverseMapping = mapping.inverse();
+ assertEquals(bitSet, Mappings.apply(inverseMapping, expected),
"inverse mapping");
}
}