This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3cb914dd7a IGNITE-21580 Sql. Optimise query plans when using two phase
aggregates (#3552)
3cb914dd7a is described below
commit 3cb914dd7ad16a879248c71545ed8083f3d80f32
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Apr 8 13:44:38 2024 +0300
IGNITE-21580 Sql. Optimise query plans when using two phase aggregates
(#3552)
---
.../internal/sql/engine/ItAggregatesTest.java | 2 +-
.../engine/metadata/IgniteMdDistinctRowCount.java | 30 ++-
.../internal/sql/engine/rel/IgniteAggregate.java | 22 +-
.../internal/sql/engine/trait/TraitUtils.java | 8 +-
.../planner/AbstractAggregatePlannerTest.java | 30 ---
.../sql/engine/planner/AbstractPlannerTest.java | 40 +++-
.../sql/engine/planner/AggregatePlannerTest.java | 236 ++++++++++-----------
.../planner/ColocatedHashAggregatePlannerTest.java | 18 +-
.../planner/MapReduceHashAggregatePlannerTest.java | 92 ++++----
.../planner/MapReduceSortAggregatePlannerTest.java | 28 +--
.../sql/engine/planner/TpchQueryPlannerTest.java | 121 +++++++++++
.../src/test/resources/mapping/correlated.test | 15 +-
.../resources/mapping/test_partition_pruning.test | 10 +-
.../src/test/resources/mapping/union.test | 50 ++---
.../src/test/resources/tpch/plan/q1.plan | 7 +
15 files changed, 429 insertions(+), 280 deletions(-)
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index f3e12c117a..ae6b4262d6 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -374,7 +374,7 @@ public class ItAggregatesTest extends
BaseSqlIntegrationTest {
var sql = "select distinct name from person";
assertQuery(sql)
-
.matches(QueryChecker.matches(".*Colocated.*Aggregate.*Exchange.*"))
+
.matches(QueryChecker.matches(".*ReduceHashAggregate.*Exchange.*MapHashAggregate.*"))
.returns("Igor")
.returns("Ilya")
.returns("Roma")
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
index 9530ee5059..9b9654631f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdDistinctRowCount.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.sql.engine.metadata;
import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.metadata.CyclicMetadataException;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
@@ -25,6 +28,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.NumberUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* IgniteMdDistinctRowCount.
@@ -36,6 +41,11 @@ public class IgniteMdDistinctRowCount extends
RelMdDistinctRowCount {
ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.DISTINCT_ROW_COUNT.method, new
IgniteMdDistinctRowCount());
+ @Override
+ public Double getDistinctRowCount(Aggregate rel, RelMetadataQuery mq,
ImmutableBitSet groupKey, @Nullable RexNode predicate) {
+ return rel.estimateRowCount(mq);
+ }
+
/** {@inheritDoc} */
@Override
public Double getDistinctRowCount(
@@ -44,14 +54,22 @@ public class IgniteMdDistinctRowCount extends
RelMdDistinctRowCount {
ImmutableBitSet groupKey,
RexNode predicate
) {
- if (groupKey.cardinality() == 0) {
- return 1d;
+ RelNode best = rel.getBest();
+ if (best != null) {
+ return mq.getDistinctRowCount(best, groupKey, predicate);
}
- double rowCount = mq.getRowCount(rel);
-
- rowCount *= 1.0 - Math.pow(.5, groupKey.cardinality());
+ Double d = null;
+ for (RelNode r2 : rel.getRels()) {
+ try {
+ Double d2 = mq.getDistinctRowCount(r2, groupKey, predicate);
+ d = NumberUtil.min(d, d2);
+ } catch (CyclicMetadataException e) {
+ // Ignore this relational expression; there will be non-cyclic
ones
+ // in this set.
+ }
+ }
- return rowCount;
+ return d;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
index 637d2aca19..0447d3945b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteAggregate.java
@@ -64,9 +64,7 @@ public abstract class IgniteAggregate extends Aggregate
implements IgniteRel {
return groupsCnt;
}
- // Estimation of the groups count is not available.
- // Use heuristic estimation for result rows count.
- return super.estimateRowCount(mq);
+ return guessDistinctRows(mq, groupSet.cardinality());
}
/**
@@ -78,11 +76,14 @@ public abstract class IgniteAggregate extends Aggregate
implements IgniteRel {
if (!aggCalls.isEmpty()) {
double grps = estimateRowCount(mq);
- double rows = input.estimateRowCount(mq);
for (AggregateCall aggCall : aggCalls) {
if (aggCall.isDistinct()) {
- mem += IgniteCost.AGG_CALL_MEM_COST * rows / grps;
+ ImmutableBitSet aggGroup =
ImmutableBitSet.of(aggCall.getArgList());
+
+ double distinctRows = guessDistinctRows(mq,
aggGroup.cardinality());
+
+ mem += IgniteCost.AGG_CALL_MEM_COST * distinctRows / grps;
} else {
mem += IgniteCost.AGG_CALL_MEM_COST;
}
@@ -92,6 +93,17 @@ public abstract class IgniteAggregate extends Aggregate
implements IgniteRel {
return mem;
}
+ /** Implements heuristics estimation for distinct row count. */
+ private double guessDistinctRows(RelMetadataQuery mq, int groupSize) {
+ if (groupSize == 0) {
+ return 1;
+ } else {
+ double rowCount = mq.getRowCount(getInput());
+ rowCount *= (1.0 - Math.pow(.8, groupSize));
+ return rowCount;
+ }
+ }
+
/**
* ComputeSelfCostHash.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 638060a75a..644b9c4251 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -339,6 +339,10 @@ public class TraitUtils {
public static List<RelNode> derive(Convention convention,
TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
assert !nullOrEmpty(inTraits);
+ if (inTraits.stream().flatMap(List::stream).anyMatch(traitSet ->
traitSet.getConvention() != convention)) {
+ return List.of();
+ }
+
RelTraitSet outTraits = rel.getCluster().traitSetOf(convention);
Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations =
combinations(outTraits, inTraits);
@@ -346,10 +350,6 @@ public class TraitUtils {
return List.of();
}
- if (inTraits.stream().flatMap(List::stream).anyMatch(traitSet ->
traitSet.getConvention() != convention)) {
- return List.of();
- }
-
PropagationContext context = new PropagationContext(combinations)
.propagate(rel::deriveCollation);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
index 288e662f77..c8ac94287d 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractAggregatePlannerTest.java
@@ -29,10 +29,8 @@ import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
-import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.util.ImmutableBitSet;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceAggregateBase;
@@ -40,7 +38,6 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
-import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.type.NativeTypes;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
@@ -1066,31 +1063,4 @@ public abstract class AbstractAggregatePlannerTest
extends AbstractPlannerTest {
return true;
};
}
-
- <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>>
groupSets, int groupKey) {
- return (node) -> {
- List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
- ImmutableBitSet firstGroupSet = allGroupSets.get(0);
-
- boolean groupSetsMatch =
allGroupSets.equals(List.of(ImmutableBitSet.of(groupKey)));
- boolean groupSetMatches =
firstGroupSet.equals(ImmutableBitSet.of(groupKey));
-
- return groupSetMatches && groupSetsMatch;
- };
- }
-
- <T> Predicate<T> hasNoGroupSets(Function<T, List<ImmutableBitSet>>
groupSets) {
- return (node) -> {
- List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
- List<ImmutableBitSet> emptyGroupSets =
List.of(ImmutableBitSet.of());
- return emptyGroupSets.equals(allGroupSets);
- };
- }
-
- <T extends RelNode> Predicate<T> hasCollation(RelCollation expected) {
- return (node) -> {
- RelCollation collation = TraitUtils.collation(node);
- return expected.equals(collation);
- };
- }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 0bd79a619c..efbf99e7ae 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -54,6 +54,7 @@ import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelVisitor;
@@ -102,6 +103,7 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -113,8 +115,7 @@ import org.apache.ignite.internal.util.Pair;
import org.jetbrains.annotations.Nullable;
/**
- * AbstractPlannerTest.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Base test class for planner tests.
*/
public abstract class AbstractPlannerTest extends IgniteAbstractTest {
protected static final String[] DISABLE_KEY_VALUE_MODIFY_RULES = {
@@ -130,11 +131,44 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
protected static final int DEFAULT_ZONE_ID = 0;
+ private static final SqlExplainLevel DEFAULT_EXPLAIN_LEVEL =
SqlExplainLevel.EXPPLAN_ATTRIBUTES;
+
private static final AtomicInteger NEXT_TABLE_ID = new AtomicInteger(2001);
/** Last error message. */
String lastErrorMsg;
+ <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>>
groupSets, int groupKey) {
+ return hasGroupSets(groupSets, List.of(groupKey));
+ }
+
+ <T> Predicate<T> hasGroupSets(Function<T, List<ImmutableBitSet>>
groupSets, List<Integer> groupKeys) {
+ return (node) -> {
+ List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+ ImmutableBitSet firstGroupSet = allGroupSets.get(0);
+
+ boolean groupSetsMatch =
allGroupSets.equals(List.of(ImmutableBitSet.of(groupKeys)));
+ boolean groupSetMatches =
firstGroupSet.equals(ImmutableBitSet.of(groupKeys));
+
+ return groupSetMatches && groupSetsMatch;
+ };
+ }
+
+ <T> Predicate<T> hasNoGroupSets(Function<T, List<ImmutableBitSet>>
groupSets) {
+ return (node) -> {
+ List<ImmutableBitSet> allGroupSets = groupSets.apply(node);
+ List<ImmutableBitSet> emptyGroupSets =
List.of(ImmutableBitSet.of());
+ return emptyGroupSets.equals(allGroupSets);
+ };
+ }
+
+ <T extends RelNode> Predicate<T> hasCollation(RelCollation expected) {
+ return (node) -> {
+ RelCollation collation = TraitUtils.collation(node);
+ return expected.equals(collation);
+ };
+ }
+
interface TestVisitor {
void visit(RelNode node, int ordinal, RelNode parent);
}
@@ -485,7 +519,7 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
) throws Exception {
IgniteRel plan = physicalPlan(sql, schemas, hintStrategies, params,
null, disabledRules);
- String planString = RelOptUtil.dumpPlan("", plan,
SqlExplainFormat.TEXT, SqlExplainLevel.ALL_ATTRIBUTES);
+ String planString = RelOptUtil.dumpPlan("", plan,
SqlExplainFormat.TEXT, DEFAULT_EXPLAIN_LEVEL);
log.info("statement: {}\n{}", sql, planString);
checkSplitAndSerialization(plan, schemas);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index d2b0e8a423..c41b377775 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
+import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
@@ -118,9 +119,8 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
checkSimpleAggWithGroupByHash(TestCase.CASE_5A);
checkSimpleAggWithGroupByHash(TestCase.CASE_5B);
- // TODO replace with calls to test methods after
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
- assumeRun("checkSimpleAggWithGroupByHash", TestCase.CASE_6A);
- assumeRun("checkSimpleAggWithGroupByHash", TestCase.CASE_6B);
+ checkSimpleAggWithGroupByHash(TestCase.CASE_6A);
+ checkSimpleAggWithGroupByHash(TestCase.CASE_6B);
checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5C);
checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_5D);
@@ -250,22 +250,8 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
public void distinctAggregateInWhereClause() throws Exception {
checkGroupWithNoAggregateSingle(TestCase.CASE_15);
- assertPlan(TestCase.CASE_15A,
- nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(not(hasAggregate()))
- .and(hasGroups())
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
- ))
- ));
- assertPlan(TestCase.CASE_15B,
- nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(not(hasAggregate()))
- .and(hasGroups())
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
- ))
- ));
+ checkGroupWithNoAggregateHash(TestCase.CASE_15A);
+ checkGroupWithNoAggregateHash(TestCase.CASE_15B);
}
/**
@@ -367,14 +353,14 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
*/
@Test
public void aggregateWithOrderByGroupColumns() throws Exception {
- checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_1,
TraitUtils.createCollation(List.of(0, 1)));
- checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_2,
TraitUtils.createCollation(List.of(1, 0)));
+ checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_1,
TraitUtils.createCollation(List.of(0)));
+ checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_19_2,
TraitUtils.createCollation(List.of(1)));
- checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1A,
TraitUtils.createCollation(List.of(0, 1)));
- checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2A,
TraitUtils.createCollation(List.of(1, 0)));
+ checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1A,
TraitUtils.createCollation(List.of(0)));
+ checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2A,
TraitUtils.createCollation(List.of(1)));
- checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1B,
TraitUtils.createCollation(List.of(0, 1)));
- checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2B,
TraitUtils.createCollation(List.of(1, 0)));
+ checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_1B,
TraitUtils.createCollation(List.of(0)));
+ checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_19_2B,
TraitUtils.createCollation(List.of(1)));
}
/**
@@ -382,32 +368,10 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
*/
@Test
public void aggregateWithGroupBySubsetOrderByColumns() throws Exception {
- assertPlan(TestCase.CASE_20,
- isInstanceOf(IgniteSort.class)
- .and(s ->
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
-
.and(nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(input(isTableScan("TEST")))
- ))
- );
+ checkGroupsWithOrderByGroupColumnsSingle(TestCase.CASE_20,
TraitUtils.createCollation(List.of(0, 1, 2)));
- assertPlan(TestCase.CASE_20A,
- isInstanceOf(IgniteSort.class)
- .and(s ->
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
- ))
- ))
- );
- assertPlan(TestCase.CASE_20B,
- isInstanceOf(IgniteSort.class)
- .and(s ->
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
- ))
- ))
- );
+ checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_20A,
TraitUtils.createCollation(List.of(0, 1, 2)));
+ checkGroupsWithOrderByGroupColumnsHash(TestCase.CASE_20B,
TraitUtils.createCollation(List.of(0, 1, 2)));
}
@@ -470,7 +434,7 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
Predicate<AggregateCall> countReduce = (a) ->
Objects.equals(a.getAggregation().getName(), "$SUM0") &&
a.getArgList().equals(List.of(1));
- Predicate<RelNode> nonColocatedGroupBy =
hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class)
+ Predicate<RelNode> nonColocatedGroupBy =
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
.and(in ->
hasAggregates(countReduce).test(in.getAggregateCalls()))
.and(input(isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
@@ -497,17 +461,37 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
/** Validate that we choose single phase AVG aggregate for AVG by default.
*/
@Test
public void avgAgg() throws Exception {
- Predicate<AggregateCall> countMap = (a) ->
Objects.equals(a.getAggregation().getName(), "AVG") &&
a.getArgList().equals(List.of(1));
+ Predicate<AggregateCall> colocated = (a) ->
+ Objects.equals(a.getAggregation().getName(), "AVG") &&
a.getArgList().equals(List.of(1));
- Predicate<IgniteColocatedHashAggregate> nonColocated =
isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(in -> hasAggregates(countMap).test(in.getAggCallList()))
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(single()))));
+ Predicate<AggregateCall> sumMap = (a) ->
+ Objects.equals(a.getAggregation().getName(), "SUM") &&
a.getArgList().equals(List.of(1));
+
+ Predicate<AggregateCall> countMap = (a) ->
+ Objects.equals(a.getAggregation().getName(), "COUNT") &&
a.getArgList().equals(List.of(1));
+
+ Predicate<AggregateCall> sumReduce = (a) ->
+ Objects.equals(a.getAggregation().getName(), "SUM") &&
a.getArgList().equals(List.of(1));
+
+ Predicate<AggregateCall> sum0Reduce = (a) ->
+ Objects.equals(a.getAggregation().getName(), "$SUM0") &&
a.getArgList().equals(List.of(2));
+
+ Predicate<RelNode> nonColocated =
hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class)
+ .and(in -> hasAggregates(sumReduce,
sum0Reduce).test(in.getAggregateCalls()))
+ .and(input(isInstanceOf(IgniteProject.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+ .and(in ->
hasAggregates(sumMap, countMap).test(in.getAggCallList()))
+ )
+ ))
+ ))));
Predicate<IgniteExchange> colocatedGroupBy =
isInstanceOf(IgniteExchange.class)
.and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(in ->
hasAggregates(countMap).test(in.getAggCallList()))
+ .and(in ->
hasAggregates(colocated).test(in.getAggCallList()))
+ .and(hasGroups())
.and(input(isTableScan("TEST")))
));
@@ -538,20 +522,15 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
checkCountDistinctHash(TestCase.CASE_24_1B);
checkCountDistinctHash(TestCase.CASE_24_1D);
- Predicate<RelNode> colocated =
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
- .and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
+ Predicate<RelNode> colocated =
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
.and(input(isInstanceOf(IgniteExchange.class)
.and(hasDistribution(single())
-
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
-
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
-
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
- ))
+
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(hasGroupSets(IgniteColocatedHashAggregate::getGroupSets, 1))
))
- )
- ))
-
- );
+ ))
+ ));
assertPlan(TestCase.CASE_24_1C, colocated);
assertPlan(TestCase.CASE_24_1E, colocated);
@@ -640,39 +619,39 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
assertPlan(testCase,
isInstanceOf(IgniteColocatedHashAggregate.class)
.and(hasAggregate())
- .and(not(hasDistinctAggregate()))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(hasGroups())
- .and(input(isTableScan("TEST")))
- ))
+ .and(hasDistinctAggregate())
+ .and(hasGroups())
+ .and(input(isTableScan("TEST")))
);
}
private void checkDistinctAggHash(TestCase testCase) throws Exception {
assertPlan(testCase,
- isInstanceOf(IgniteColocatedHashAggregate.class)
+ nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
.and(hasAggregate())
.and(not(hasDistinctAggregate()))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+ .and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+ .and(not(hasAggregate()))
+
.and(input(isTableScan("TEST")))
+ ))
))
))
- );
+ ));
}
private void checkColocatedDistinctAggHash(TestCase testCase) throws
Exception {
assertPlan(testCase,
- nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
+ nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
+ .and(hasAggregate())
+ .and(not(hasDistinctAggregate()))
.and(input(isInstanceOf(IgniteExchange.class)
-
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
- .and(hasAggregate())
- .and(not(hasDistinctAggregate()))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(hasGroups())
-
.and(input(isTableScan("TEST")))
- ))
+
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
+ .and(hasGroups())
+ .and(input(isTableScan("TEST")))
))
))
));
@@ -688,16 +667,25 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
);
}
+
private void checkDistinctAggWithGroupByHash(TestCase testCase) throws
Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(hasDistinctAggregate())
+ .and(hasAggregate())
+ .and(not(hasDistinctAggregate()))
.and(hasGroups())
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
+
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+ .and(not(hasAggregate()))
+ .and(hasGroups())
+ .and(input(isInstanceOf(IgniteExchange.class)
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+ .and(not(hasAggregate()))
+ .and(hasGroups())
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
))
- )
- );
+ ));
}
private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase)
throws Exception {
@@ -755,11 +743,15 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
private void checkGroupWithNoAggregateHash(TestCase testCase) throws
Exception {
assertPlan(testCase,
- nodeOrAnyChild(isInstanceOf(IgniteColocatedHashAggregate.class)
+ nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+ .and(not(hasAggregate()))
+ .and(hasGroups())
+ .and(input(isTableScan("TEST")))
+ ))
))
));
}
@@ -812,9 +804,9 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
private void checkGroupsWithOrderByGroupColumnsSingle(TestCase testCase,
RelCollation collation) throws Exception {
assertPlan(testCase,
- isInstanceOf(IgniteColocatedSortAggregate.class)
- .and(input(isInstanceOf(IgniteSort.class)
- .and(s -> s.collation().equals(collation))
+ isInstanceOf(IgniteSort.class)
+ .and(s -> s.collation().equals(collation))
+
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
.and(input(isTableScan("TEST")))
))
);
@@ -822,12 +814,15 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
private void checkGroupsWithOrderByGroupColumnsHash(TestCase testCase,
RelCollation collation) throws Exception {
assertPlan(testCase,
- isInstanceOf(IgniteColocatedSortAggregate.class)
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(single()))
- .and(input(isInstanceOf(IgniteSort.class)
- .and(s ->
s.collation().equals(collation))
- .and(input(isTableScan("TEST")))
+ isInstanceOf(IgniteSort.class)
+ .and(s -> s.collation().equals(collation))
+ .and(input(isInstanceOf(IgniteProject.class)
+
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
))
))
);
@@ -881,31 +876,32 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
}
private void
checkDerivedCollationWithOrderBySubsetOfGroupColumnsSingle(TestCase testCase)
throws Exception {
- RelCollation requiredCollation = RelCollations.of(
- TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST),
- TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST)
+ RelCollation outputCollation = RelCollations.of(
+ TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST)
);
- assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
- .and(hasCollation(requiredCollation))
- .and(input(isInstanceOf(IgniteSort.class)
- .and(hasCollation(requiredCollation))
- )));
+ assertPlan(testCase,
+ isInstanceOf(IgniteSort.class)
+ .and(hasCollation(outputCollation))
+
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)))
+ );
}
private void
checkDerivedCollationWithOrderBySubsetOfGroupColumnsHash(TestCase testCase)
throws Exception {
- RelCollation requiredCollation = RelCollations.of(
- TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST),
- TraitUtils.createFieldCollation(0, Collation.ASC_NULLS_LAST)
+ RelCollation outputCollation = RelCollations.of(
+ TraitUtils.createFieldCollation(1, Collation.DESC_NULLS_FIRST)
);
- assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
- .and(hasCollation(requiredCollation))
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(single()))
- .and(input(isInstanceOf(IgniteSort.class)
- .and(hasCollation(requiredCollation))
- ))
- )));
+ assertPlan(testCase,
+ isInstanceOf(IgniteSort.class)
+ .and(hasCollation(outputCollation))
+ .and(input(isInstanceOf(IgniteProject.class)
+
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)))
+ ))
+ ))
+ )));
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
index 93b63b1c0d..fec1474ed0 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedHashAggregatePlannerTest.java
@@ -591,11 +591,9 @@ public class ColocatedHashAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(testCase,
isInstanceOf(IgniteColocatedHashAggregate.class)
.and(hasAggregate())
- .and(not(hasDistinctAggregate()))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(hasGroups())
- .and(input(isTableScan("TEST")))
- )),
+ .and(hasDistinctAggregate())
+ .and(hasGroups())
+ .and(input(isTableScan("TEST"))),
disableRules
);
}
@@ -604,12 +602,10 @@ public class ColocatedHashAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(testCase,
isInstanceOf(IgniteColocatedHashAggregate.class)
.and(hasAggregate())
- .and(not(hasDistinctAggregate()))
-
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
- .and(hasGroups())
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isTableScan("TEST")))
- ))
+ .and(hasDistinctAggregate())
+ .and(hasGroups())
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isTableScan("TEST")))
)),
disableRules
);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index b09e421d5b..8230d06a47 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -141,22 +141,21 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
checkDistinctAggWithGroupBySingle(TestCase.CASE_7_2);
checkDistinctAggWithGroupBySingle(TestCase.CASE_7_3);
- // TODO replace with calls to test methods after
https://issues.apache.org/jira/browse/IGNITE-20083 is fixed
- assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_1A);
- assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_2A);
- assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_3A);
+ checkDistinctAggWithGroupByHash(TestCase.CASE_7_1A);
+ checkDistinctAggWithGroupByHash(TestCase.CASE_7_2A);
+ checkDistinctAggWithGroupByHash(TestCase.CASE_7_3A);
- assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_1B);
- assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_2B);
- assumeRun("checkDistinctAggWithGroupByHash", TestCase.CASE_7_3B);
+ checkDistinctAggWithGroupByHash(TestCase.CASE_7_1B);
+ checkDistinctAggWithGroupByHash(TestCase.CASE_7_2B);
+ checkDistinctAggWithGroupByHash(TestCase.CASE_7_3B);
- assumeRun("checkDistinctAggWithColocatedGroupByHash",
TestCase.CASE_7_1C);
- assumeRun("checkDistinctAggWithColocatedGroupByHash",
TestCase.CASE_7_2C);
- assumeRun("checkDistinctAggWithColocatedGroupByHash",
TestCase.CASE_7_3C);
+ checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1C);
+ checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2C);
+ checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3C);
- assumeRun("checkDistinctAggWithColocatedGroupByHash",
TestCase.CASE_7_1D);
- assumeRun("checkDistinctAggWithColocatedGroupByHash",
TestCase.CASE_7_2D);
- assumeRun("checkDistinctAggWithColocatedGroupByHash",
TestCase.CASE_7_3D);
+ checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_1D);
+ checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_2D);
+ checkDistinctAggWithColocatedGroupByHash(TestCase.CASE_7_3D);
}
/**
@@ -214,11 +213,11 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
@Test
public void distinctWithoutAggregate() throws Exception {
checkGroupWithNoAggregateSingle(TestCase.CASE_12);
- // TODO replace with calls to test methods after
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12A);
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12B);
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12C);
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_12D);
+
+ checkGroupWithNoAggregateHash(TestCase.CASE_12A);
+ checkGroupWithNoAggregateHash(TestCase.CASE_12B);
+ checkGroupWithNoAggregateHash(TestCase.CASE_12C);
+ checkGroupWithNoAggregateHash(TestCase.CASE_12D);
}
/**
@@ -227,11 +226,11 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
@Test
public void distinctWithoutAggregateUseIndex() throws Exception {
checkGroupWithNoAggregateSingle(TestCase.CASE_13);
- // TODO replace with calls to test methods after
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13A);
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13B);
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13C);
- assumeRun("checkGroupWithNoAggregateHash", TestCase.CASE_13D);
+
+ checkGroupWithNoAggregateHash(TestCase.CASE_13A);
+ checkGroupWithNoAggregateHash(TestCase.CASE_13B);
+ checkGroupWithNoAggregateHash(TestCase.CASE_13C);
+ checkGroupWithNoAggregateHash(TestCase.CASE_13D);
}
/**
@@ -436,15 +435,14 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
))
);
- // TODO Replace with uncommented code after
https://issues.apache.org/jira/browse/IGNITE-20083 is resolved
- assumeRun("", TestCase.CASE_21A);
- assumeRun("", TestCase.CASE_21B);
- /*
assertPlan(TestCase.CASE_21A,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
.and(input(0, subtreePredicate))
.and(input(1, subtreePredicate))
), disableRules);
- */
+ assertPlan(TestCase.CASE_21B,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+ .and(input(0, subtreePredicate))
+ .and(input(1, subtreePredicate))
+ ), disableRules);
}
/**
@@ -708,6 +706,29 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
);
}
+ private void checkDistinctAggWithColocatedGroupByHash(TestCase testCase)
throws Exception {
+ assertPlan(testCase,
+ nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+ .and(hasAggregate())
+ .and(not(hasDistinctAggregate()))
+ .and(input(isInstanceOf(IgniteMapHashAggregate.class)
+
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
+ .and(not(hasAggregate()))
+ .and(hasGroups())
+
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+
.and(not(hasAggregate()))
+ .and(hasGroups())
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
+ ))
+ ))
+ ),
+ disableRules
+ );
+ }
+
private void checkAggWithGroupByIndexColumnsSingle(TestCase testCase)
throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
@@ -787,10 +808,8 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(s -> s.collation().equals(collation))
.and(input(isInstanceOf(IgniteProject.class)
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
- // Why can't Map be pushed
down to under 'exchange'.
-
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
.and(input(isTableScan("TEST")))
))
))
@@ -878,12 +897,9 @@ public class MapReduceHashAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(hasCollation(outputCollation))
.and(input(isInstanceOf(IgniteProject.class)
.and(input(isInstanceOf(IgniteReduceHashAggregate.class)
-
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
- // Why can't Map be pushed
down to under 'exchange'.
-
.and(input(isInstanceOf(IgniteExchange.class)
-
.and(hasDistribution(IgniteDistributions.single()))
- ))
+
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(hasDistribution(IgniteDistributions.single()))
+
.and(input(isInstanceOf(IgniteMapHashAggregate.class)))
))
))
)),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index 34925da593..14d8da0739 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -522,7 +522,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
Predicate<IgniteReduceSortAggregate> inputAgg =
isInstanceOf(IgniteReduceSortAggregate.class)
.and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
.and(hasCollation(RelCollations.of(0)))
- .and(input(isInstanceOf(IgniteMapSortAggregate.class)
+ .and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(hasCollation(RelCollations.of(1)))
.and(hasGroupSets(Aggregate::getGroupSets, 1))
));
@@ -846,11 +846,9 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
private void checkGroupsWithOrderBySubsetOfGroupColumnsHash(TestCase
testCase, RelCollation collation) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
- .and(input(isInstanceOf(IgniteMapSortAggregate.class)
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
- // Why can't Map be pushed down to under
'exchange'.
- .and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(single()))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(collation))
.and(input(isTableScan("TEST")))
@@ -868,8 +866,6 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(input(isInstanceOf(IgniteProject.class)
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
- // Why can't Map be pushed
down to under 'exchange'.
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(collation))
.and(input(isTableScan("TEST")
@@ -888,11 +884,9 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(s ->
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
.and(input(isInstanceOf(IgniteProject.class)
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
- // Why can't Map be pushed
down to under 'exchange'.
-
.and(input(isInstanceOf(IgniteExchange.class)
-
.and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(collation))
.and(input(isTableScan("TEST")
@@ -994,11 +988,9 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
isInstanceOf(IgniteProject.class)
.and(hasCollation(outputCollation))
.and(input(isInstanceOf(IgniteReduceSortAggregate.class)
-
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
- // Why can't Map be pushed down to
under 'exchange'.
-
.and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(single()))
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(input(isInstanceOf(IgniteSort.class)
.and(hasCollation(requiredCollation))
))
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TpchQueryPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TpchQueryPlannerTest.java
new file mode 100644
index 0000000000..6fb4329ea9
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/TpchQueryPlannerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.util.Cloner;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.tpch.TpchHelper;
+import org.apache.ignite.internal.sql.engine.util.tpch.TpchTables;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests ensures a planner generates optimal plan for TPC-H queries.
+ *
+ * @see org.apache.ignite.internal.sql.engine.benchmarks.TpchParseBenchmark
+ */
+// TODO https://issues.apache.org/jira/browse/IGNITE-21986 validate other
query plans and make test parameterized.
+public class TpchQueryPlannerTest extends AbstractPlannerTest {
+ private static TestCluster CLUSTER;
+
+ @BeforeAll
+ static void startCluster() {
+ CLUSTER = TestBuilders.cluster().nodes("N1").build();
+ CLUSTER.start();
+
+ TestNode node = CLUSTER.node("N1");
+
+ node.initSchema(TpchTables.LINEITEM.ddlScript());
+ }
+
+ @AfterAll
+ static void stopCluster() throws Exception {
+ CLUSTER.stop();
+ CLUSTER = null;
+ }
+
+ @Test
+ public void tpchTest_q1() {
+ validateQueryPlan("1");
+ }
+
+ private static void validateQueryPlan(String queryId) {
+ TestNode node = CLUSTER.node("N1");
+
+ MultiStepPlan plan = (MultiStepPlan)
node.prepare(TpchHelper.getQuery(queryId));
+
+ String actualPlan = RelOptUtil.toString(Cloner.clone(plan.root(),
Commons.cluster()), SqlExplainLevel.DIGEST_ATTRIBUTES);
+ String expectedPlan = getQueryPlan(queryId);
+
+ assertEquals(expectedPlan, actualPlan);
+ }
+
+ /**
+ * Loads query plan for provided TPC-H query id.
+ *
+ * @see TpchHelper#getQuery(String) for query id details.
+ */
+ public static String getQueryPlan(String queryId) {
+ // variant query ends with "v"
+ boolean variant = queryId.endsWith("v");
+ int numericId;
+
+ if (variant) {
+ String idString = queryId.substring(0, queryId.length() - 1);
+ numericId = Integer.parseInt(idString);
+ } else {
+ numericId = Integer.parseInt(queryId);
+ }
+
+ if (variant) {
+ var variantQueryFile = String.format("tpch/plan/variant_q%d.plan",
numericId);
+ return loadFromResource(variantQueryFile);
+ } else {
+ var queryFile = String.format("tpch/plan/q%s.plan", numericId);
+ return loadFromResource(queryFile);
+ }
+ }
+
+ static String loadFromResource(String resource) {
+ try (InputStream is =
TpchHelper.class.getClassLoader().getResourceAsStream(resource)) {
+ if (is == null) {
+ throw new IllegalArgumentException("Resource does not exist: "
+ resource);
+ }
+ try (InputStreamReader reader = new InputStreamReader(is,
StandardCharsets.UTF_8)) {
+ return CharStreams.toString(reader);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("I/O operation failed: " +
resource, e);
+ }
+ }
+}
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 09750faf36..779cd1ac5b 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -211,7 +211,7 @@ Fragment#0 root
Project
CorrelatedNestedLoopJoin
Receiver(sourceFragment=1, exchange=1, distribution=single)
- ColocatedHashAggregate
+ ReduceHashAggregate
Receiver(sourceFragment=2, exchange=2, distribution=single)
Fragment#2 correlated
@@ -222,7 +222,8 @@ Fragment#2 correlated
pruningMetadata: [3=[{0=$cor0.ID}]]
tree:
Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3,
distribution=random)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3,
distribution=random)
Fragment#1
targetNodes: [N0]
@@ -250,7 +251,7 @@ Fragment#0 root
Project
CorrelatedNestedLoopJoin
Receiver(sourceFragment=2, exchange=2, distribution=single)
- ColocatedHashAggregate
+ ReduceHashAggregate
Receiver(sourceFragment=3, exchange=3, distribution=single)
Fragment#3 correlated
@@ -261,7 +262,8 @@ Fragment#3 correlated
pruningMetadata: [4=[{0=$cor1.ID}]]
tree:
Sender(targetFragment=0, exchange=3, distribution=single)
- TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
Fragment#2 correlated
targetNodes: [N0]
@@ -302,7 +304,7 @@ Fragment#0 root
Project
CorrelatedNestedLoopJoin
Receiver(sourceFragment=2, exchange=2, distribution=single)
- ColocatedHashAggregate
+ ReduceHashAggregate
Receiver(sourceFragment=3, exchange=3, distribution=single)
Fragment#3 correlated
@@ -313,7 +315,8 @@ Fragment#3 correlated
pruningMetadata: [4=[{0=$cor0.ID}, {0=$cor2.ID}]]
tree:
Sender(targetFragment=0, exchange=3, distribution=single)
- TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
Fragment#2 correlated
targetNodes: [N0]
diff --git
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 1451a4d63b..8fc500b503 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -122,7 +122,7 @@ Fragment#0 root
Project
CorrelatedNestedLoopJoin
Receiver(sourceFragment=1, exchange=1, distribution=single)
- ColocatedHashAggregate
+ ReduceHashAggregate
Receiver(sourceFragment=2, exchange=2, distribution=single)
Fragment#2 correlated
@@ -133,7 +133,8 @@ Fragment#2 correlated
pruningMetadata: [3=[{0=$cor0.ID}]]
tree:
Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3,
distribution=random)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3,
distribution=random)
Fragment#1
targetNodes: [N0]
@@ -158,7 +159,7 @@ Fragment#0 root
Project
CorrelatedNestedLoopJoin
Receiver(sourceFragment=1, exchange=1, distribution=single)
- ColocatedHashAggregate
+ ReduceHashAggregate
Receiver(sourceFragment=2, exchange=2, distribution=single)
Fragment#2 correlated
@@ -169,7 +170,8 @@ Fragment#2 correlated
pruningMetadata: [3=[{0=$cor0.ID}]]
tree:
Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2,
distribution=random)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2,
distribution=random)
Fragment#1
targetNodes: [N0]
diff --git a/modules/sql-engine/src/test/resources/mapping/union.test
b/modules/sql-engine/src/test/resources/mapping/union.test
index 9b58561ed5..1ab8915466 100644
--- a/modules/sql-engine/src/test/resources/mapping/union.test
+++ b/modules/sql-engine/src/test/resources/mapping/union.test
@@ -4,32 +4,23 @@ SELECT /*+
DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
---
Fragment#0 root
executionNodes: [N1]
- remoteFragments: [1, 2]
- exchangeSourceNodes: {1=[N1], 2=[N1]}
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
tree:
ReduceHashAggregate
- MapHashAggregate
- UnionAll
- Receiver(sourceFragment=1, exchange=1, distribution=single)
- Receiver(sourceFragment=2, exchange=2, distribution=single)
-
-Fragment#2
- targetNodes: [N1]
- executionNodes: [N1]
- tables: [T2_N1]
- partitions: {N1=[0:1]}
- tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.T2_N1, source=3, partitions=1,
distribution=affinity[table: T2_N1, columns: [ID]])
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
Fragment#1
targetNodes: [N1]
executionNodes: [N1]
- tables: [T1_N1]
+ tables: [T1_N1, T2_N1]
partitions: {N1=[0:1]}
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T1_N1, source=4, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
+ MapHashAggregate
+ UnionAll
+ TableScan(name=PUBLIC.T1_N1, source=2, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
+ TableScan(name=PUBLIC.T2_N1, source=3, partitions=1,
distribution=affinity[table: T2_N1, columns: [ID]])
---
N1
@@ -62,32 +53,23 @@ SELECT /*+
DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
---
Fragment#0 root
executionNodes: [N1]
- remoteFragments: [1, 2]
- exchangeSourceNodes: {1=[N1, N2], 2=[N1, N2]}
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1, N2]}
tree:
ReduceHashAggregate
- MapHashAggregate
- UnionAll
- Receiver(sourceFragment=1, exchange=1, distribution=single)
- Receiver(sourceFragment=2, exchange=2, distribution=single)
-
-Fragment#2
- targetNodes: [N1]
- executionNodes: [N1, N2]
- tables: [T2_N1N2]
- partitions: {N1=[0:2], N2=[1:2]}
- tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.T2_N1N2, source=3, partitions=2,
distribution=affinity[table: T2_N1N2, columns: [ID]])
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
Fragment#1
targetNodes: [N1]
executionNodes: [N1, N2]
- tables: [T1_N1N2]
+ tables: [T1_N1N2, T2_N1N2]
partitions: {N1=[0:2], N2=[1:2]}
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T1_N1N2, source=4, partitions=2,
distribution=affinity[table: T1_N1N2, columns: [ID]])
+ MapHashAggregate
+ UnionAll
+ TableScan(name=PUBLIC.T1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N1N2, columns: [ID]])
+ TableScan(name=PUBLIC.T2_N1N2, source=3, partitions=2,
distribution=affinity[table: T2_N1N2, columns: [ID]])
---
N1
diff --git a/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
b/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
new file mode 100644
index 0000000000..af8defab87
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
@@ -0,0 +1,7 @@
+Sort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
+ Project(inputs=[0..5], exprs=[[DECIMAL_DIVIDE($6, $7, 15, 2),
DECIMAL_DIVIDE($8, $9, 15, 2), DECIMAL_DIVIDE($10, $11, 15, 2),
CAST($12):BIGINT NOT NULL]])
+ ReduceHashAggregate(group=[{0, 1}], SUM_QTY=[SUM($2)],
SUM_BASE_PRICE=[SUM($3)], SUM_DISC_PRICE=[SUM($4)], SUM_CHARGE=[SUM($5)],
AVG_SUM6=[SUM($6)], AVG_SUM06=[$SUM0($7)], AVG_SUM8=[SUM($8)],
AVG_SUM08=[$SUM0($9)], AVG_SUM10=[SUM($10)], AVG_SUM010=[$SUM0($11)],
COUNT_12_MAP_SUM=[$SUM0($12)])
+ Project(inputs=[0..6], exprs=[[CAST($7):DECIMAL(32767, 0) NOT NULL, $8,
CAST($9):DECIMAL(32767, 0) NOT NULL, $10, CAST($11):DECIMAL(32767, 0) NOT NULL,
$12, $13]])
+ Exchange(distribution=[single])
+ MapHashAggregate(group=[{0, 1}], SUM_QTY=[SUM($2)],
SUM_BASE_PRICE=[SUM($3)], SUM_DISC_PRICE=[SUM($4)], SUM_CHARGE=[SUM($5)],
AVG_SUM6=[SUM($2)], AVG_COUNT6=[COUNT($2)], AVG_SUM8=[SUM($3)],
AVG_COUNT8=[COUNT($3)], AVG_SUM10=[SUM($6)], AVG_COUNT10=[COUNT($6)],
COUNT_ORDER=[COUNT()])
+ IndexScan(table=[[PUBLIC, LINEITEM]], index=[L_SD], type=[SORTED],
searchBounds=[[RangeBounds [lowerBound=null, upperBound=-(1998-12-01,
7776000000:INTERVAL DAY), lowerInclude=true, upperInclude=true]]],
filters=[<=($t6, -(1998-12-01, 7776000000:INTERVAL DAY))], projects=[[$t4, $t5,
$t0, $t1, *($t1, -(1, $t2)), *(*($t1, -(1, $t2)), +(1, $t3)), $t2]],
requiredColumns=[{4, 5, 6, 7, 8, 9, 10}], collation=[[10]])