This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8e053c0b39 IGNITE-21668 Sql. Disable order-preserving multi target
exchange (#3356)
8e053c0b39 is described below
commit 8e053c0b39ca0f74fd44826202e3298e8b803f6f
Author: korlov42 <[email protected]>
AuthorDate: Wed Mar 6 15:57:14 2024 +0200
IGNITE-21668 Sql. Disable order-preserving multi target exchange (#3356)
---
.../internal/sql/engine/hint/IgniteHint.java | 10 +--
.../internal/sql/engine/prepare/PlannerHelper.java | 81 ------------------
.../internal/sql/engine/prepare/PlannerPhase.java | 3 +
.../ignite/internal/sql/engine/rel/IgniteSort.java | 2 +-
.../rel/agg/IgniteColocatedAggregateBase.java | 4 +
.../sql/engine/rule/MergeJoinConverterRule.java | 4 +-
.../engine/rule/SortAggregateConverterRule.java | 5 +-
.../sql/engine/rule/SortExchangeTransposeRule.java | 98 ++++++++++++++++++++++
.../internal/sql/engine/trait/TraitUtils.java | 5 ++
.../ignite/internal/sql/engine/util/Commons.java | 6 +-
.../ignite/internal/sql/engine/util/HintUtils.java | 11 ---
.../sql/engine/planner/AggregatePlannerTest.java | 13 +--
.../planner/ColocatedSortAggregatePlannerTest.java | 77 +++++++++++++----
.../planner/IdentityDistributionPlannerTest.java | 26 +++---
.../engine/planner/JoinColocationPlannerTest.java | 64 ++++++--------
.../planner/MapReduceSortAggregatePlannerTest.java | 79 ++++++++++++++---
16 files changed, 287 insertions(+), 201 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
index 46902527f3..ebda54d4dc 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
@@ -34,15 +34,7 @@ public enum IgniteHint {
/** Disables index usage. **/
NO_INDEX(true),
/** Forces index usage. */
- FORCE_INDEX(true),
- /**
- * Disables sorted algorithm for node this hint specified on.
- *
- * <p>At the moment, this hint is introduced as workaround to the problem,
when sorting
- * on correlated path may result in deadlock. Assigned by optimizer,
should not be specified
- * explicitly in the query.
- */
- DISABLE_SORTED_ALGORITHM;
+ FORCE_INDEX(true);
private final boolean paramSupport;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
index c4bdf1c1ce..728e4b12f3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
@@ -31,10 +31,7 @@ import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCorrelate;
-import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
@@ -43,7 +40,6 @@ import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.hint.Hints;
-import org.apache.ignite.internal.sql.engine.hint.IgniteHint;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -105,8 +101,6 @@ public final class PlannerHelper {
rel = planner.trimUnusedFields(root.withRel(rel)).rel;
- rel = new SortedAlgorithmDisabler().visit(rel);
-
boolean amountOfJoinsAreBig = hasTooMuchJoins(rel);
boolean enforceJoinOrder = hints.present(ENFORCE_JOIN_ORDER);
if (amountOfJoinsAreBig || enforceJoinOrder) {
@@ -216,79 +210,4 @@ public final class PlannerHelper {
return Math.max(countOfSources, maxCountOfSourcesInSubQuery);
}
}
-
- /**
- * Traverses relational tree and assign {@link
IgniteHint#DISABLE_SORTED_ALGORITHM} hint
- * to every {@link LogicalAggregate} and {@link LogicalJoin} node on right
side of the
- * {@link LogicalCorrelate} node.
- *
- * <p>This is workaround of a deadlock caused by preserving sorting on a
correlated path.
- *
- * <pre>
- * Legend:
- * [f#1] -- fragment #1
- * (n#1) -- node #1
- * [f#2 (n#1) (n#2) ] -- fragment #2 mapped on nodes #1 and #2
- *
- * Problematic subtree:
- * [f#1 (n#1) ]
- * / \
- * [f#2 (n#1) (n#2) ]
- * | X |
- * [f#3 (n#1) (n#2) ]
- *
- * Problematic sequence of events:
- * 1) f#1n#1 sets the correlated variable and requests data from f#2.
- * 2) Request messages arrives at n#1 and n#2 accordingly. Since they
are the first
- * requests, the nodes start processing immediately.
- * 3) Processing of f#2 requires data to be pulled from f#3, thus
requests will be sent.
- * 4) Here comes the first factor: fragment may process only one
correlated request at a time.
- * So, f#3 start processing of request from f#2n#1 immediately, while
request from f#2n#2 will
- * be postponed.
- * 5) f#3 prepares first batch of data, sends it to the f#2n#1 and
waits until the next batch will be
- * requested. Request from f#2n#2 still postponed until all data for
correlate from f#2n#1 will be processed.
- * 6) f#2n#1 propagates received batch to f#1n#1 and waits for the
next request.
- * 7) Here comes the second and final factor: since f#1n#1 expects
data to be sorted, exchange between
- * f#1 and f#2 will waits for batches from all parties (f#2n#1 and
f#2n#2 in our case).
- *
- * </pre>
- */
- private static class SortedAlgorithmDisabler extends RelHomogeneousShuttle
{
- private static final RelHint DISABLE_SORTED_ALGORITHM =
RelHint.builder(IgniteHint.DISABLE_SORTED_ALGORITHM.name()).build();
-
- private int correlated = 0;
-
- @Override
- public RelNode visit(LogicalAggregate aggregate) {
- if (correlated > 0) {
- List<RelHint> hints = new ArrayList<>(aggregate.getHints());
- hints.add(DISABLE_SORTED_ALGORITHM);
- aggregate = (LogicalAggregate) aggregate.withHints(hints);
- }
-
- return super.visit(aggregate);
- }
-
- @Override
- public RelNode visit(LogicalJoin join) {
- if (correlated > 0) {
- List<RelHint> hints = new ArrayList<>(join.getHints());
- hints.add(DISABLE_SORTED_ALGORITHM);
- join = (LogicalJoin) join.withHints(hints);
- }
-
- return super.visit(join);
- }
-
- @Override
- public RelNode visit(LogicalCorrelate correlate) {
- RelNode output = visitChild(correlate, 0, correlate.getInput(0));
-
- correlated++;
- output = visitChild(output, 1, correlate.getInput(1));
- correlated--;
-
- return output;
- }
- }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 120a12014b..48d6344826 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -57,6 +57,7 @@ import
org.apache.ignite.internal.sql.engine.rule.ProjectConverterRule;
import org.apache.ignite.internal.sql.engine.rule.SetOpConverterRule;
import org.apache.ignite.internal.sql.engine.rule.SortAggregateConverterRule;
import org.apache.ignite.internal.sql.engine.rule.SortConverterRule;
+import org.apache.ignite.internal.sql.engine.rule.SortExchangeTransposeRule;
import
org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule;
import org.apache.ignite.internal.sql.engine.rule.TableModifyConverterRule;
import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
@@ -184,6 +185,8 @@ public enum PlannerPhase {
b.operand(LogicalSort.class)
.anyInputs()).toRule(),
+ SortExchangeTransposeRule.INSTANCE,
+
CoreRules.UNION_MERGE,
CoreRules.MINUS_MERGE,
CoreRules.INTERSECT_MERGE,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
index 36777dfaff..fbdcd3183e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
@@ -168,7 +168,7 @@ public class IgniteSort extends Sort implements IgniteRel {
// Distributed sorting is more preferable than sorting on the single
node.
if (TraitUtils.distributionEnabled(this) &&
TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single())) {
- cost.plus(costFactory.makeTinyCost());
+ cost = cost.plus(costFactory.makeTinyCost());
}
return cost;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
index 15a020d8b1..565193b70b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
@@ -103,6 +103,10 @@ public abstract class IgniteColocatedAggregateBase extends
IgniteAggregate imple
IgniteDistribution newOutDistribution =
newInDistribution.apply(Commons.trimmingMapping(rowType.getFieldCount(),
groupSet));
return List.of(
+ Pair.of(
+ nodeTraits.replace(IgniteDistributions.single()),
+
List.of(inputTraits.get(0).replace(IgniteDistributions.single()))
+ ),
Pair.of(
nodeTraits.replace(newOutDistribution),
List.of(inputTraits.get(0).replace(newInDistribution))
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
index c690e9b421..72b60eb258 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine.rule;
-import static
org.apache.ignite.internal.sql.engine.util.HintUtils.isSortedAlgorithmAllowed;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import org.apache.calcite.plan.RelOptCluster;
@@ -53,8 +52,7 @@ public class MergeJoinConverterRule extends
AbstractIgniteConverterRule<LogicalJ
LogicalJoin logicalJoin = call.rel(0);
return !nullOrEmpty(logicalJoin.analyzeCondition().pairs())
- && logicalJoin.analyzeCondition().isEqui()
- && isSortedAlgorithmAllowed(logicalJoin);
+ && logicalJoin.analyzeCondition().isEqui();
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
index 5134d9ec2e..6dd3c82009 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.rule;
import static
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.canBeImplementedAsMapReduce;
-import static
org.apache.ignite.internal.sql.engine.util.HintUtils.isSortedAlgorithmAllowed;
import static
org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -73,8 +72,7 @@ public class SortAggregateConverterRule {
LogicalAggregate aggregate = call.rel(0);
return !HintUtils.isExpandDistinctAggregate(aggregate)
- && aggregate.getGroupSets().size() == 1
- && isSortedAlgorithmAllowed(aggregate);
+ && aggregate.getGroupSets().size() == 1;
}
/** {@inheritDoc} */
@@ -117,7 +115,6 @@ public class SortAggregateConverterRule {
return !HintUtils.isExpandDistinctAggregate(aggregate)
&& (nullOrEmpty(aggregate.getGroupSet()) ||
aggregate.getGroupSets().size() == 1)
- && isSortedAlgorithmAllowed(aggregate)
&& canBeImplementedAsMapReduce(aggregate.getAggCallList())
&& !complexDistinctAgg(aggregate.getAggCallList());
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortExchangeTransposeRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortExchangeTransposeRule.java
new file mode 100644
index 0000000000..fbf714a436
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortExchangeTransposeRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
+import static
org.apache.ignite.internal.sql.engine.trait.TraitUtils.distribution;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.immutables.value.Value;
+
+/**
+ * A rule that pushes {@link IgniteSort} node under {@link IgniteExchange}.
+ */
[email protected]
+public class SortExchangeTransposeRule extends
RelRule<SortExchangeTransposeRule.Config> {
+ public static final RelOptRule INSTANCE = Config.INSTANCE.toRule();
+
+ private SortExchangeTransposeRule(Config cfg) {
+ super(cfg);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ IgniteExchange exchange = call.rel(1);
+
+ return hashAlike(distribution(exchange.getInput()))
+ && exchange.distribution() == single();
+ }
+
+ private static boolean hashAlike(IgniteDistribution distribution) {
+ return distribution.getType() == Type.HASH_DISTRIBUTED
+ || distribution.getType() == Type.RANDOM_DISTRIBUTED;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ IgniteSort sort = call.rel(0);
+ IgniteExchange exchange = call.rel(1);
+
+ RelOptCluster cluster = sort.getCluster();
+ RelCollation collation = sort.collation();
+ RelNode input = exchange.getInput();
+
+ call.transformTo(
+ new IgniteExchange(
+ cluster,
+ exchange.getTraitSet()
+ .replace(collation),
+ convert(input, input.getTraitSet().replace(collation)),
+ exchange.distribution()
+ )
+ );
+ }
+
+ /** Configuration. */
+ @SuppressWarnings({"ClassNameSameAsAncestorName",
"InnerClassFieldHidesOuterClassField"})
+ @Value.Immutable
+ public interface Config extends RelRule.Config {
+ Config INSTANCE = ImmutableSortExchangeTransposeRule.Config.of()
+ .withDescription("SortExchangeTransposeRule")
+ .withOperandSupplier(o0 ->
+ o0.operand(IgniteSort.class)
+ .oneInput(o1 ->
+ o1.operand(IgniteExchange.class)
+ .anyInputs()))
+ .as(Config.class);
+
+ /** {@inheritDoc} */
+ @Override
+ default SortExchangeTransposeRule toRule() {
+ return new SortExchangeTransposeRule(this);
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index d9fed6570f..638060a75a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -22,6 +22,7 @@ import static java.util.Collections.singletonList;
import static org.apache.calcite.plan.RelOptUtil.permutationPushDownProject;
import static
org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -165,6 +166,10 @@ public class TraitUtils {
if (fromTrait.getType() == BROADCAST_DISTRIBUTED && toTrait.getType()
== HASH_DISTRIBUTED) {
return new IgniteTrimExchange(rel.getCluster(), traits, rel,
toTrait);
} else {
+ if (toTrait.getType() != SINGLETON && collation(traits) !=
RelCollations.EMPTY) {
+ return null;
+ }
+
return new IgniteExchange(
rel.getCluster(),
traits,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index d442d95e59..d4e5890639 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -63,7 +63,6 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.hint.HintPredicates;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlKind;
@@ -138,8 +137,8 @@ public final class Commons {
@SuppressWarnings("rawtypes")
public static final List<RelTraitDef> DISTRIBUTED_TRAITS_SET = List.of(
ConventionTraitDef.INSTANCE,
- RelCollationTraitDef.INSTANCE,
- DistributionTraitDef.INSTANCE
+ DistributionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE
);
public static final FrameworkConfig FRAMEWORK_CONFIG =
Frameworks.newConfigBuilder()
@@ -159,7 +158,6 @@ public final class Commons {
.hintStrategy(IgniteHint.EXPAND_DISTINCT_AGG.name(), AGGREGATE)
.hintStrategy(IgniteHint.NO_INDEX.name(),
(hint, rel) -> rel instanceof IgniteLogicalTableScan)
.hintStrategy(IgniteHint.FORCE_INDEX.name(), (hint, rel) -> rel instanceof
IgniteLogicalTableScan)
-
.hintStrategy(IgniteHint.DISABLE_SORTED_ALGORITHM.name(),
HintPredicates.and(JOIN, AGGREGATE))
.build()
)
)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
index 864aa7bb9f..1f5770097b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine.util;
-import static
org.apache.ignite.internal.sql.engine.hint.IgniteHint.DISABLE_SORTED_ALGORITHM;
import static
org.apache.ignite.internal.sql.engine.hint.IgniteHint.EXPAND_DISTINCT_AGG;
import java.util.Arrays;
@@ -83,14 +82,4 @@ public class HintUtils {
return rel.getCluster().getHintStrategies()
.apply(hintList, rel);
}
-
- /**
- * Return {@code true} if {@link IgniteHint#DISABLE_SORTED_ALGORITHM} hint
is not presented in provided relational node.
- *
- * @param rel Relation of interest.
- */
- public static <T extends RelNode & Hintable> boolean
isSortedAlgorithmAllowed(T rel) {
- return rel.getHints().stream()
- .noneMatch(r ->
r.hintName.equals(DISABLE_SORTED_ALGORITHM.name()));
- }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index 3e01245939..32363f8cb2 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.planner;
import static java.util.function.Predicate.not;
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import java.util.List;
import java.util.Objects;
@@ -37,7 +38,6 @@ import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.junit.jupiter.api.Test;
@@ -479,7 +479,7 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
));
Predicate<IgniteExchange> colocatedGroupBy =
isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single()))
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
.and(in ->
hasAggregates(countMap).test(in.getAggCallList()))
.and(input(isTableScan("TEST")))
@@ -500,10 +500,10 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
Predicate<IgniteColocatedHashAggregate> nonColocated =
isInstanceOf(IgniteColocatedHashAggregate.class)
.and(in -> hasAggregates(countMap).test(in.getAggCallList()))
.and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single()))));
+ .and(hasDistribution(single()))));
Predicate<IgniteExchange> colocatedGroupBy =
isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single()))
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
.and(in ->
hasAggregates(countMap).test(in.getAggCallList()))
.and(input(isTableScan("TEST")))
@@ -539,7 +539,7 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
Predicate<RelNode> colocated =
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
.and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
.and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single())
+ .and(hasDistribution(single())
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
@@ -802,6 +802,7 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
assertPlan(testCase,
isInstanceOf(IgniteColocatedSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(collation))
.and(input(isTableScan("TEST")))
@@ -815,7 +816,7 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
.and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
.and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
.and(input(isInstanceOf(IgniteExchange.class)
-
.and(hasDistribution(IgniteDistributions.single())
+ .and(hasDistribution(single())
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
.and(hasGroupSets(IgniteMapHashAggregate::getGroupSets, 1))
))
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 7f533e86bb..5d6c01439e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -18,9 +18,7 @@
package org.apache.ignite.internal.sql.engine.planner;
import static java.util.function.Predicate.not;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import java.util.List;
import java.util.Objects;
@@ -28,13 +26,13 @@ import java.util.function.Predicate;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
+import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
-import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.util.ArrayUtils;
import org.junit.jupiter.api.Test;
@@ -272,6 +270,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
.and(nodeOrAnyChild(input(1,
isInstanceOf(IgniteColocatedSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isIndexScan("TEST",
"idx_val0")))
))
))
@@ -282,6 +281,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
.and(nodeOrAnyChild(input(1,
isInstanceOf(IgniteColocatedSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isIndexScan("TEST",
"idx_val0")))
))
))
@@ -295,17 +295,49 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
*/
@Test
public void emptyCollationPassThroughLimit() throws Exception {
- RuntimeException e = assertThrows(RuntimeException.class,
- () -> assertPlan(TestCase.CASE_17,
isInstanceOf(IgniteRel.class), disableRules));
- assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
-
- e = assertThrows(RuntimeException.class,
- () -> assertPlan(TestCase.CASE_17A,
isInstanceOf(IgniteRel.class), disableRules));
- assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
+ assertPlan(TestCase.CASE_17,
+ hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(1,
isInstanceOf(IgniteColocatedSortAggregate.class)
+ .and(input(isInstanceOf(IgniteLimit.class)
+
.and(input(isInstanceOf(IgniteSort.class)
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
+ ))
+ ),
+ disableRules
+ );
- e = assertThrows(RuntimeException.class,
- () -> assertPlan(TestCase.CASE_17B,
isInstanceOf(IgniteRel.class), disableRules));
- assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
+ assertPlan(TestCase.CASE_17A,
+ hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(1,
isInstanceOf(IgniteColocatedSortAggregate.class)
+ .and(input(isInstanceOf(IgniteLimit.class)
+
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteSort.class)
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
+ ))
+ ))
+ ),
+ disableRules
+ );
+ assertPlan(TestCase.CASE_17B,
+ hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(1,
isInstanceOf(IgniteColocatedSortAggregate.class)
+ .and(input(isInstanceOf(IgniteLimit.class)
+
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteSort.class)
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
+ ))
+ ))
+ ),
+ disableRules
+ );
}
/**
@@ -408,7 +440,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
Predicate<IgniteColocatedSortAggregate> checkPlan =
isInstanceOf(IgniteColocatedSortAggregate.class)
.and(in -> hasAggregates(countMap).test(in.getAggCallList()))
.and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single()))));
+ .and(hasDistribution(single()))));
assertPlan(TestCase.CASE_22, checkPlan, disableRules);
assertPlan(TestCase.CASE_22A, checkPlan, disableRules);
@@ -426,7 +458,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
Predicate<IgniteColocatedSortAggregate> checkPlan =
isInstanceOf(IgniteColocatedSortAggregate.class)
.and(in -> hasAggregates(countMap).test(in.getAggCallList()))
.and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single()))));
+ .and(hasDistribution(single()))));
assertPlan(TestCase.CASE_23, checkPlan, disableRules);
assertPlan(TestCase.CASE_23A, checkPlan, disableRules);
@@ -500,6 +532,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(hasAggregate())
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(input(isTableScan("TEST")))
))
@@ -550,6 +583,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(hasDistinctAggregate())
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(input(isTableScan("TEST")))
))
@@ -574,6 +608,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
.and(hasAggregate())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isIndexScan("TEST",
"idx_grp0_grp1")))
))
),
@@ -584,6 +619,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
private void checkAggWithColocatedGroupByIndexColumnsHash(TestCase
testCase) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
.and(hasAggregate())
.and(input(isIndexScan("TEST",
"idx_grp0_grp1")))
@@ -623,6 +659,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isIndexScan("TEST",
"idx_grp0_grp1")))
))
),
@@ -633,6 +670,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
private void checkColocatedGroupWithNoAggregateUseIndexHash(TestCase
testCase) throws Exception {
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
.and(not(hasAggregate()))
.and(hasGroups())
@@ -649,6 +687,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(input(isTableScan("TEST")))
))
@@ -684,6 +723,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(testCase,
isInstanceOf(IgniteColocatedSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(collation))
.and(input(isTableScan("TEST")))
@@ -713,6 +753,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(s ->
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(TraitUtils.createCollation(List.of(0, 1))))
.and(input(isTableScan("TEST")))
@@ -727,7 +768,7 @@ public class ColocatedSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
.and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single()))
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteTableScan.class))))
), disableRules);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
index aa6043f33c..7b92276159 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
@@ -77,14 +77,13 @@ public class IdentityDistributionPlannerTest extends
AbstractPlannerTest {
+ "from TEST_TBL1 t1 "
+ "join TEST_TBL2 t2 on t1.id = t2.id";
- assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
.and(hasDistribution(single()))
- .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
- .and(hasDistribution(IgniteDistributions.identity(0)))
- .and(input(0, isInstanceOf(IgniteIndexScan.class)))
- .and(input(1, isInstanceOf(IgniteExchange.class)
-
.and(input(isInstanceOf(IgniteIndexScan.class)))
- ))
+ .and(input(0, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteIndexScan.class)))
+ ))
+ .and(input(1, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteIndexScan.class)))
))
));
}
@@ -105,14 +104,13 @@ public class IdentityDistributionPlannerTest extends
AbstractPlannerTest {
+ "from TEST_TBL1 t1 "
+ "join TEST_TBL2 t2 on t1.id = t2.id";
- assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
.and(hasDistribution(single()))
- .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
- .and(hasDistribution(affinityDistribution))
- .and(input(0, isInstanceOf(IgniteExchange.class)
-
.and(input(isInstanceOf(IgniteIndexScan.class)))
- ))
- .and(input(1, isInstanceOf(IgniteIndexScan.class)))
+ .and(input(0, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteIndexScan.class)))
+ ))
+ .and(input(1, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteIndexScan.class)))
))
));
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
index 0a20e4afe2..6d18730a3c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
@@ -17,14 +17,12 @@
package org.apache.ignite.internal.sql.engine.planner;
-import static org.hamcrest.CoreMatchers.equalTo;
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.hasSize;
-import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.ImmutableIntList;
@@ -32,7 +30,6 @@ import
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
-import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -115,23 +112,19 @@ public class JoinColocationPlannerTest extends
AbstractPlannerTest {
+ "from COMPLEX_TBL t1 "
+ "join SIMPLE_TBL t2 on t1.id1 = t2.id and t1.id2 = t2.id2";
- RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter",
"CorrelatedNestedLoopJoin");
-
- IgniteMergeJoin join = findFirstNode(phys,
byClass(IgniteMergeJoin.class));
-
- String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
-
- assertThat(invalidPlanMsg, join, notNullValue());
- assertThat(invalidPlanMsg, join.distribution().function().affinity(),
is(true));
-
- List<IgniteExchange> exchanges = findNodes(phys, node -> node
instanceof IgniteExchange
- && ((IgniteRel) node).distribution().function().affinity());
-
- assertThat(invalidPlanMsg, exchanges, hasSize(1));
- assertThat(invalidPlanMsg, exchanges.get(0).getInput(0),
instanceOf(IgniteSort.class));
- assertThat(invalidPlanMsg, exchanges.get(0).getInput(0).getInput(0),
instanceOf(IgniteTableScan.class));
- assertThat(invalidPlanMsg, exchanges.get(0).getInput(0).getInput(0)
- .getTable().unwrap(IgniteTable.class), equalTo(simpleTbl));
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+ .and(hasDistribution(complexTbl.distribution()))
+ .and(input(0, isInstanceOf(IgniteIndexScan.class)
+ .and(scan ->
complexTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
+ ))
+ .and(input(1, isInstanceOf(IgniteSort.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteTableScan.class)
+ .and(scan ->
simpleTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
+ ))
+ ))
+ ))
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
}
/**
@@ -157,22 +150,19 @@ public class JoinColocationPlannerTest extends
AbstractPlannerTest {
+ "from COMPLEX_TBL_DIRECT t1 "
+ "join COMPLEX_TBL_INDIRECT t2 on t1.id1 = t2.id1 and t1.id2
= t2.id2";
- RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter",
"CorrelatedNestedLoopJoin");
-
- IgniteMergeJoin join = findFirstNode(phys,
byClass(IgniteMergeJoin.class));
-
- String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
-
- assertThat(invalidPlanMsg, join, notNullValue());
- assertThat(invalidPlanMsg, join.distribution().function().affinity(),
is(true));
-
- List<IgniteExchange> exchanges = findNodes(phys, node -> node
instanceof IgniteExchange
- && ((IgniteRel) node).distribution().function().affinity());
-
- assertThat(invalidPlanMsg, exchanges, hasSize(1));
- assertThat(invalidPlanMsg, exchanges.get(0).getInput(0),
instanceOf(IgniteIndexScan.class));
- assertThat(invalidPlanMsg, exchanges.get(0).getInput(0)
- .getTable().unwrap(IgniteTable.class),
equalTo(complexTblIndirect));
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+ .and(hasDistribution(single()))
+ .and(input(0, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteIndexScan.class)
+ .and(scan ->
complexTblDirect.equals(scan.getTable().unwrap(IgniteTable.class)))
+ ))
+ ))
+ .and(input(1, isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteIndexScan.class)
+ .and(scan ->
complexTblIndirect.equals(scan.getTable().unwrap(IgniteTable.class)))
+ ))
+ ))
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
}
private static IgniteTable simpleTable(String tableName, int size) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index a006423f83..88543a92ee 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.planner;
import static java.util.function.Predicate.not;
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -30,14 +31,15 @@ import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
+import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.util.ArrayUtils;
import org.junit.jupiter.api.Test;
@@ -274,6 +276,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(TestCase.CASE_16A,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(input(isIndexScan("TEST",
"idx_val0")))
))
@@ -284,6 +287,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(TestCase.CASE_16B,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(input(isIndexScan("TEST",
"idx_val0")))
))
@@ -298,17 +302,55 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
*/
@Test
public void emptyCollationPassThroughLimit() throws Exception {
- RuntimeException e = assertThrows(RuntimeException.class,
- () -> assertPlan(TestCase.CASE_17,
isInstanceOf(IgniteRel.class), disableRules));
- assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
-
- e = assertThrows(RuntimeException.class,
- () -> assertPlan(TestCase.CASE_17A,
isInstanceOf(IgniteRel.class), disableRules));
- assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
+ assertPlan(TestCase.CASE_17,
+ hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(1,
isInstanceOf(IgniteReduceSortAggregate.class)
+
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+
.and(input(isInstanceOf(IgniteLimit.class)
+
.and(input(isInstanceOf(IgniteSort.class)
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
+ ))
+ ))
+ ),
+ disableRules
+ );
- e = assertThrows(RuntimeException.class,
- () -> assertPlan(TestCase.CASE_17B,
isInstanceOf(IgniteRel.class), disableRules));
- assertThat(e.getMessage(), containsString("There are not enough rules
to produce a node with desired properties"));
+ assertPlan(TestCase.CASE_17A,
+ hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(1,
isInstanceOf(IgniteReduceSortAggregate.class)
+
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+
.and(input(isInstanceOf(IgniteLimit.class)
+
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteSort.class)
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
+ ))
+ ))
+ ))
+ ),
+ disableRules
+ );
+ assertPlan(TestCase.CASE_17B,
+ hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(1,
isInstanceOf(IgniteReduceSortAggregate.class)
+
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+
.and(input(isInstanceOf(IgniteLimit.class)
+
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(hasDistribution(single()))
+
.and(input(isInstanceOf(IgniteSort.class)
+
.and(input(isTableScan("TEST")))
+ ))
+ ))
+ ))
+ ))
+ ))
+ ),
+ disableRules
+ );
}
/**
@@ -393,6 +435,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(not(hasAggregate()))
.and(hasGroups())
@@ -425,6 +468,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
Predicate<RelNode> nonColocated =
hasChildThat(isInstanceOf(IgniteReduceSortAggregate.class)
.and(in ->
hasAggregates(countReduce).test(in.getAggregateCalls()))
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(in ->
hasAggregates(countMap).test(in.getAggCallList()))
)
@@ -469,7 +513,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(hasCollation(RelCollations.of(0)))
.and(input(isInstanceOf(IgniteExchange.class)
.and(hasCollation(RelCollations.of(0)))
- .and(hasDistribution(IgniteDistributions.single()))
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(hasCollation(RelCollations.of(1)))
.and(hasGroupSets(Aggregate::getGroupSets, 1))
@@ -516,6 +560,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(hasAggregate())
.and(hasGroups())
@@ -546,6 +591,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(hasAggregate())
.and(hasGroups())
@@ -594,6 +640,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(not(hasAggregate()))
.and(input(isInstanceOf(IgniteSort.class)
@@ -641,6 +688,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(not(hasAggregate()))
.and(hasGroups())
@@ -673,6 +721,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
assertPlan(testCase,
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(hasAggregate())
.and(input(isIndexScan("TEST",
"idx_grp0_grp1")))
@@ -706,6 +755,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(not(hasAggregate()))
.and(hasGroups())
@@ -740,6 +790,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(not(hasAggregate()))
.and(hasGroups())
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(not(hasAggregate()))
.and(hasGroups())
@@ -773,6 +824,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
// TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
// Why can't Map be pushed down to under
'exchange'.
.and(input(isInstanceOf(IgniteExchange.class)
+ .and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(collation))
.and(input(isTableScan("TEST")))
@@ -814,6 +866,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
// TODO:
https://issues.apache.org/jira/browse/IGNITE-20095
// Why can't Map be pushed
down to under 'exchange'.
.and(input(isInstanceOf(IgniteExchange.class)
+
.and(hasDistribution(single()))
.and(input(isInstanceOf(IgniteSort.class)
.and(s ->
s.collation().equals(collation))
.and(input(isTableScan("TEST")
@@ -832,7 +885,7 @@ public class MapReduceSortAggregatePlannerTest extends
AbstractAggregatePlannerT
.and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
.and(hasCollation(RelCollations.of(0)))
.and(input(isInstanceOf(IgniteExchange.class)
- .and(hasDistribution(IgniteDistributions.single()))
+ .and(hasDistribution(single()))
.and(hasCollation(RelCollations.of(0)))
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
.and(hasGroupSets(Aggregate::getGroupSets, 1))