This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e053c0b39 IGNITE-21668 Sql. Disable order-preserving multi target 
exchange (#3356)
8e053c0b39 is described below

commit 8e053c0b39ca0f74fd44826202e3298e8b803f6f
Author: korlov42 <[email protected]>
AuthorDate: Wed Mar 6 15:57:14 2024 +0200

    IGNITE-21668 Sql. Disable order-preserving multi target exchange (#3356)
---
 .../internal/sql/engine/hint/IgniteHint.java       | 10 +--
 .../internal/sql/engine/prepare/PlannerHelper.java | 81 ------------------
 .../internal/sql/engine/prepare/PlannerPhase.java  |  3 +
 .../ignite/internal/sql/engine/rel/IgniteSort.java |  2 +-
 .../rel/agg/IgniteColocatedAggregateBase.java      |  4 +
 .../sql/engine/rule/MergeJoinConverterRule.java    |  4 +-
 .../engine/rule/SortAggregateConverterRule.java    |  5 +-
 .../sql/engine/rule/SortExchangeTransposeRule.java | 98 ++++++++++++++++++++++
 .../internal/sql/engine/trait/TraitUtils.java      |  5 ++
 .../ignite/internal/sql/engine/util/Commons.java   |  6 +-
 .../ignite/internal/sql/engine/util/HintUtils.java | 11 ---
 .../sql/engine/planner/AggregatePlannerTest.java   | 13 +--
 .../planner/ColocatedSortAggregatePlannerTest.java | 77 +++++++++++++----
 .../planner/IdentityDistributionPlannerTest.java   | 26 +++---
 .../engine/planner/JoinColocationPlannerTest.java  | 64 ++++++--------
 .../planner/MapReduceSortAggregatePlannerTest.java | 79 ++++++++++++++---
 16 files changed, 287 insertions(+), 201 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
index 46902527f3..ebda54d4dc 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/hint/IgniteHint.java
@@ -34,15 +34,7 @@ public enum IgniteHint {
     /** Disables index usage. **/
     NO_INDEX(true),
     /** Forces index usage. */
-    FORCE_INDEX(true),
-    /**
-     * Disables sorted algorithm for node this hint specified on.
-     *
-     * <p>At the moment, this hint is introduced as workaround to the problem, 
when sorting
-     * on correlated path may result in deadlock. Assigned by optimizer, 
should not be specified
-     * explicitly in the query.
-     */
-    DISABLE_SORTED_ALGORITHM;
+    FORCE_INDEX(true);
 
     private final boolean paramSupport;
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
index c4bdf1c1ce..728e4b12f3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
@@ -31,10 +31,7 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelHomogeneousShuttle;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
-import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.rules.CoreRules;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
@@ -43,7 +40,6 @@ import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.hint.Hints;
-import org.apache.ignite.internal.sql.engine.hint.IgniteHint;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -105,8 +101,6 @@ public final class PlannerHelper {
 
             rel = planner.trimUnusedFields(root.withRel(rel)).rel;
 
-            rel = new SortedAlgorithmDisabler().visit(rel);
-
             boolean amountOfJoinsAreBig = hasTooMuchJoins(rel);
             boolean enforceJoinOrder = hints.present(ENFORCE_JOIN_ORDER);
             if (amountOfJoinsAreBig || enforceJoinOrder) {
@@ -216,79 +210,4 @@ public final class PlannerHelper {
             return Math.max(countOfSources, maxCountOfSourcesInSubQuery);
         }
     }
-
-    /**
-     * Traverses relational tree and assign {@link 
IgniteHint#DISABLE_SORTED_ALGORITHM} hint
-     * to every {@link LogicalAggregate} and {@link LogicalJoin} node on right 
side of the
-     * {@link LogicalCorrelate} node.
-     *
-     * <p>This is workaround of a deadlock caused by preserving sorting on a 
correlated path.
-     *
-     * <pre>
-     * Legend:
-     *      [f#1] -- fragment #1
-     *      (n#1) -- node #1
-     *      [f#2 (n#1) (n#2) ] -- fragment #2 mapped on nodes #1 and #2
-     *
-     * Problematic subtree:
-     *       [f#1 (n#1) ]
-     *            /   \
-     *    [f#2 (n#1) (n#2) ]
-     *           |  X  |
-     *    [f#3 (n#1) (n#2) ]
-     *
-     * Problematic sequence of events:
-     *      1) f#1n#1 sets the correlated variable and requests data from f#2.
-     *      2) Request messages arrives at n#1 and n#2 accordingly. Since they 
are the first
-     *      requests, the nodes start processing immediately.
-     *      3) Processing of f#2 requires data to be pulled from f#3, thus 
requests will be sent.
-     *      4) Here comes the first factor: fragment may process only one 
correlated request at a time.
-     *      So, f#3 start processing of request from f#2n#1 immediately, while 
request from f#2n#2 will
-     *      be postponed.
-     *      5) f#3 prepares first batch of data, sends it to the f#2n#1 and 
waits until the next batch will be
-     *      requested. Request from f#2n#2 still postponed until all data for 
correlate from f#2n#1 will be processed.
-     *      6) f#2n#1 propagates received batch to f#1n#1 and waits for the 
next request.
-     *      7) Here comes the second and final factor: since f#1n#1 expects 
data to be sorted, exchange between
-     *      f#1 and f#2 will waits for batches from all parties (f#2n#1 and 
f#2n#2 in our case).
-     *
-     * </pre>
-     */
-    private static class SortedAlgorithmDisabler extends RelHomogeneousShuttle 
{
-        private static final RelHint DISABLE_SORTED_ALGORITHM = 
RelHint.builder(IgniteHint.DISABLE_SORTED_ALGORITHM.name()).build();
-
-        private int correlated = 0;
-
-        @Override
-        public RelNode visit(LogicalAggregate aggregate) {
-            if (correlated > 0) {
-                List<RelHint> hints = new ArrayList<>(aggregate.getHints());
-                hints.add(DISABLE_SORTED_ALGORITHM);
-                aggregate = (LogicalAggregate) aggregate.withHints(hints);
-            }
-
-            return super.visit(aggregate);
-        }
-
-        @Override
-        public RelNode visit(LogicalJoin join) {
-            if (correlated > 0) {
-                List<RelHint> hints = new ArrayList<>(join.getHints());
-                hints.add(DISABLE_SORTED_ALGORITHM);
-                join = (LogicalJoin) join.withHints(hints);
-            }
-
-            return super.visit(join);
-        }
-
-        @Override
-        public RelNode visit(LogicalCorrelate correlate) {
-            RelNode output = visitChild(correlate, 0, correlate.getInput(0));
-
-            correlated++;
-            output = visitChild(output, 1, correlate.getInput(1));
-            correlated--;
-
-            return output;
-        }
-    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 120a12014b..48d6344826 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -57,6 +57,7 @@ import 
org.apache.ignite.internal.sql.engine.rule.ProjectConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.SetOpConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.SortAggregateConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.SortConverterRule;
+import org.apache.ignite.internal.sql.engine.rule.SortExchangeTransposeRule;
 import 
org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.TableModifyConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
@@ -184,6 +185,8 @@ public enum PlannerPhase {
                             b.operand(LogicalSort.class)
                                     .anyInputs()).toRule(),
 
+            SortExchangeTransposeRule.INSTANCE,
+
             CoreRules.UNION_MERGE,
             CoreRules.MINUS_MERGE,
             CoreRules.INTERSECT_MERGE,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
index 36777dfaff..fbdcd3183e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteSort.java
@@ -168,7 +168,7 @@ public class IgniteSort extends Sort implements IgniteRel {
 
         // Distributed sorting is more preferable than sorting on the single 
node.
         if (TraitUtils.distributionEnabled(this) && 
TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single())) {
-            cost.plus(costFactory.makeTinyCost());
+            cost = cost.plus(costFactory.makeTinyCost());
         }
 
         return cost;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
index 15a020d8b1..565193b70b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedAggregateBase.java
@@ -103,6 +103,10 @@ public abstract class IgniteColocatedAggregateBase extends 
IgniteAggregate imple
         IgniteDistribution newOutDistribution = 
newInDistribution.apply(Commons.trimmingMapping(rowType.getFieldCount(), 
groupSet));
 
         return List.of(
+                Pair.of(
+                        nodeTraits.replace(IgniteDistributions.single()),
+                        
List.of(inputTraits.get(0).replace(IgniteDistributions.single()))
+                ),
                 Pair.of(
                         nodeTraits.replace(newOutDistribution),
                         List.of(inputTraits.get(0).replace(newInDistribution))
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
index c690e9b421..72b60eb258 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/MergeJoinConverterRule.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.rule;
 
-import static 
org.apache.ignite.internal.sql.engine.util.HintUtils.isSortedAlgorithmAllowed;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import org.apache.calcite.plan.RelOptCluster;
@@ -53,8 +52,7 @@ public class MergeJoinConverterRule extends 
AbstractIgniteConverterRule<LogicalJ
         LogicalJoin logicalJoin = call.rel(0);
 
         return !nullOrEmpty(logicalJoin.analyzeCondition().pairs())
-                && logicalJoin.analyzeCondition().isEqui()
-                && isSortedAlgorithmAllowed(logicalJoin);
+                && logicalJoin.analyzeCondition().isEqui();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
index 5134d9ec2e..6dd3c82009 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortAggregateConverterRule.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.rule;
 
 import static 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.canBeImplementedAsMapReduce;
-import static 
org.apache.ignite.internal.sql.engine.util.HintUtils.isSortedAlgorithmAllowed;
 import static 
org.apache.ignite.internal.sql.engine.util.PlanUtils.complexDistinctAgg;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
@@ -73,8 +72,7 @@ public class SortAggregateConverterRule {
             LogicalAggregate aggregate = call.rel(0);
 
             return !HintUtils.isExpandDistinctAggregate(aggregate)
-                    && aggregate.getGroupSets().size() == 1
-                    && isSortedAlgorithmAllowed(aggregate);
+                    && aggregate.getGroupSets().size() == 1;
         }
 
         /** {@inheritDoc} */
@@ -117,7 +115,6 @@ public class SortAggregateConverterRule {
 
             return !HintUtils.isExpandDistinctAggregate(aggregate)
                     && (nullOrEmpty(aggregate.getGroupSet()) || 
aggregate.getGroupSets().size() == 1)
-                    && isSortedAlgorithmAllowed(aggregate)
                     && canBeImplementedAsMapReduce(aggregate.getAggCallList())
                     && !complexDistinctAgg(aggregate.getAggCallList());
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortExchangeTransposeRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortExchangeTransposeRule.java
new file mode 100644
index 0000000000..fbf714a436
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/SortExchangeTransposeRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
+import static 
org.apache.ignite.internal.sql.engine.trait.TraitUtils.distribution;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.immutables.value.Value;
+
+/**
+ * A rule that pushes {@link IgniteSort} node under {@link IgniteExchange}.
+ */
[email protected]
+public class SortExchangeTransposeRule extends 
RelRule<SortExchangeTransposeRule.Config> {
+    public static final RelOptRule INSTANCE = Config.INSTANCE.toRule();
+
+    private SortExchangeTransposeRule(Config cfg) {
+        super(cfg);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        IgniteExchange exchange = call.rel(1);
+
+        return hashAlike(distribution(exchange.getInput()))
+                && exchange.distribution() == single();
+    }
+
+    private static boolean hashAlike(IgniteDistribution distribution) {
+        return distribution.getType() == Type.HASH_DISTRIBUTED
+                || distribution.getType() == Type.RANDOM_DISTRIBUTED;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        IgniteSort sort = call.rel(0);
+        IgniteExchange exchange = call.rel(1);
+
+        RelOptCluster cluster = sort.getCluster();
+        RelCollation collation = sort.collation();
+        RelNode input = exchange.getInput();
+
+        call.transformTo(
+                new IgniteExchange(
+                        cluster,
+                        exchange.getTraitSet()
+                                .replace(collation),
+                        convert(input, input.getTraitSet().replace(collation)),
+                        exchange.distribution()
+                )
+        );
+    }
+
+    /** Configuration. */
+    @SuppressWarnings({"ClassNameSameAsAncestorName", 
"InnerClassFieldHidesOuterClassField"})
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+        Config INSTANCE = ImmutableSortExchangeTransposeRule.Config.of()
+                .withDescription("SortExchangeTransposeRule")
+                .withOperandSupplier(o0 ->
+                        o0.operand(IgniteSort.class)
+                                .oneInput(o1 ->
+                                        o1.operand(IgniteExchange.class)
+                                                .anyInputs()))
+                .as(Config.class);
+
+        /** {@inheritDoc} */
+        @Override
+        default SortExchangeTransposeRule toRule() {
+            return new SortExchangeTransposeRule(this);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index d9fed6570f..638060a75a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -22,6 +22,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.calcite.plan.RelOptUtil.permutationPushDownProject;
 import static 
org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.SINGLETON;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -165,6 +166,10 @@ public class TraitUtils {
         if (fromTrait.getType() == BROADCAST_DISTRIBUTED && toTrait.getType() 
== HASH_DISTRIBUTED) {
             return new IgniteTrimExchange(rel.getCluster(), traits, rel, 
toTrait);
         } else {
+            if (toTrait.getType() != SINGLETON && collation(traits) != 
RelCollations.EMPTY) {
+                return null;
+            }
+
             return new IgniteExchange(
                     rel.getCluster(),
                     traits,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index d442d95e59..d4e5890639 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -63,7 +63,6 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.hint.HintPredicates;
 import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.sql.SqlKind;
@@ -138,8 +137,8 @@ public final class Commons {
     @SuppressWarnings("rawtypes")
     public static final List<RelTraitDef> DISTRIBUTED_TRAITS_SET = List.of(
             ConventionTraitDef.INSTANCE,
-            RelCollationTraitDef.INSTANCE,
-            DistributionTraitDef.INSTANCE
+            DistributionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE
     );
 
     public static final FrameworkConfig FRAMEWORK_CONFIG = 
Frameworks.newConfigBuilder()
@@ -159,7 +158,6 @@ public final class Commons {
                                     
.hintStrategy(IgniteHint.EXPAND_DISTINCT_AGG.name(), AGGREGATE)
                                     .hintStrategy(IgniteHint.NO_INDEX.name(), 
(hint, rel) -> rel instanceof IgniteLogicalTableScan)
                                     
.hintStrategy(IgniteHint.FORCE_INDEX.name(), (hint, rel) -> rel instanceof 
IgniteLogicalTableScan)
-                                    
.hintStrategy(IgniteHint.DISABLE_SORTED_ALGORITHM.name(), 
HintPredicates.and(JOIN, AGGREGATE))
                                     .build()
                     )
             )
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
index 864aa7bb9f..1f5770097b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HintUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.util;
 
-import static 
org.apache.ignite.internal.sql.engine.hint.IgniteHint.DISABLE_SORTED_ALGORITHM;
 import static 
org.apache.ignite.internal.sql.engine.hint.IgniteHint.EXPAND_DISTINCT_AGG;
 
 import java.util.Arrays;
@@ -83,14 +82,4 @@ public class HintUtils {
         return rel.getCluster().getHintStrategies()
                 .apply(hintList, rel);
     }
-
-    /**
-     * Return {@code true} if {@link IgniteHint#DISABLE_SORTED_ALGORITHM} hint 
is not presented in provided relational node.
-     *
-     * @param rel Relation of interest.
-     */
-    public static <T extends RelNode & Hintable> boolean 
isSortedAlgorithmAllowed(T rel) {
-        return rel.getHints().stream()
-                .noneMatch(r -> 
r.hintName.equals(DISABLE_SORTED_ALGORITHM.name()));
-    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index 3e01245939..32363f8cb2 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.planner;
 
 import static java.util.function.Predicate.not;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
 
 import java.util.List;
 import java.util.Objects;
@@ -37,7 +38,6 @@ import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.junit.jupiter.api.Test;
 
@@ -479,7 +479,7 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
                 ));
 
         Predicate<IgniteExchange> colocatedGroupBy = 
isInstanceOf(IgniteExchange.class)
-                .and(hasDistribution(IgniteDistributions.single()))
+                .and(hasDistribution(single()))
                 .and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(in -> 
hasAggregates(countMap).test(in.getAggCallList()))
                         .and(input(isTableScan("TEST")))
@@ -500,10 +500,10 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         Predicate<IgniteColocatedHashAggregate> nonColocated = 
isInstanceOf(IgniteColocatedHashAggregate.class)
                 .and(in -> hasAggregates(countMap).test(in.getAggCallList()))
                 .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(IgniteDistributions.single()))));
+                        .and(hasDistribution(single()))));
 
         Predicate<IgniteExchange> colocatedGroupBy = 
isInstanceOf(IgniteExchange.class)
-                .and(hasDistribution(IgniteDistributions.single()))
+                .and(hasDistribution(single()))
                 .and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
                         .and(in -> 
hasAggregates(countMap).test(in.getAggCallList()))
                         .and(input(isTableScan("TEST")))
@@ -539,7 +539,7 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         Predicate<RelNode> colocated = 
nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
                 .and(hasNoGroupSets(IgniteReduceSortAggregate::getGroupSets))
                 .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(IgniteDistributions.single())
+                        .and(hasDistribution(single())
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         
.and(hasNoGroupSets(IgniteMapSortAggregate::getGroupSets))
                                         
.and(input(isInstanceOf(IgniteColocatedHashAggregate.class)
@@ -802,6 +802,7 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         assertPlan(testCase,
                 isInstanceOf(IgniteColocatedSortAggregate.class)
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 .and(input(isInstanceOf(IgniteSort.class)
                                         .and(s -> 
s.collation().equals(collation))
                                         .and(input(isTableScan("TEST")))
@@ -815,7 +816,7 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
                 
.and(nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
                         
.and(hasGroupSets(IgniteReduceHashAggregate::getGroupSets, 0))
                         .and(input(isInstanceOf(IgniteExchange.class)
-                                
.and(hasDistribution(IgniteDistributions.single())
+                                .and(hasDistribution(single())
                                         
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
                                                 
.and(hasGroupSets(IgniteMapHashAggregate::getGroupSets, 1))
                                         ))
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
index 7f533e86bb..5d6c01439e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ColocatedSortAggregatePlannerTest.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.sql.engine.planner;
 
 import static java.util.function.Predicate.not;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
 
 import java.util.List;
 import java.util.Objects;
@@ -28,13 +26,13 @@ import java.util.function.Predicate;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
-import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedSortAggregate;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.junit.jupiter.api.Test;
@@ -272,6 +270,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
                         .and(nodeOrAnyChild(input(1, 
isInstanceOf(IgniteColocatedSortAggregate.class)
                                         
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
                                                 .and(input(isIndexScan("TEST", 
"idx_val0")))
                                         ))
                                 ))
@@ -282,6 +281,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 not(nodeOrAnyChild(isInstanceOf(IgniteSort.class)))
                         .and(nodeOrAnyChild(input(1, 
isInstanceOf(IgniteColocatedSortAggregate.class)
                                         
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
                                                 .and(input(isIndexScan("TEST", 
"idx_val0")))
                                         ))
                                 ))
@@ -295,17 +295,49 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void emptyCollationPassThroughLimit() throws Exception {
-        RuntimeException e = assertThrows(RuntimeException.class,
-                () -> assertPlan(TestCase.CASE_17, 
isInstanceOf(IgniteRel.class), disableRules));
-        assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
-
-        e = assertThrows(RuntimeException.class,
-                () -> assertPlan(TestCase.CASE_17A, 
isInstanceOf(IgniteRel.class), disableRules));
-        assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
+        assertPlan(TestCase.CASE_17,
+                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                        .and(input(1, 
isInstanceOf(IgniteColocatedSortAggregate.class)
+                                .and(input(isInstanceOf(IgniteLimit.class)
+                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                
.and(input(isTableScan("TEST")))
+                                        ))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
 
-        e = assertThrows(RuntimeException.class,
-                () -> assertPlan(TestCase.CASE_17B, 
isInstanceOf(IgniteRel.class), disableRules));
-        assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
+        assertPlan(TestCase.CASE_17A,
+                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                        .and(input(1, 
isInstanceOf(IgniteColocatedSortAggregate.class)
+                                .and(input(isInstanceOf(IgniteLimit.class)
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
+                                                
.and(input(isInstanceOf(IgniteSort.class)
+                                                        
.and(input(isTableScan("TEST")))
+                                                ))
+                                        ))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
+        assertPlan(TestCase.CASE_17B,
+                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                        .and(input(1, 
isInstanceOf(IgniteColocatedSortAggregate.class)
+                                .and(input(isInstanceOf(IgniteLimit.class)
+                                        
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
+                                                
.and(input(isInstanceOf(IgniteSort.class)
+                                                        
.and(input(isTableScan("TEST")))
+                                                ))
+                                        ))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
     }
 
     /**
@@ -408,7 +440,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         Predicate<IgniteColocatedSortAggregate> checkPlan = 
isInstanceOf(IgniteColocatedSortAggregate.class)
                 .and(in -> hasAggregates(countMap).test(in.getAggCallList()))
                 .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(IgniteDistributions.single()))));
+                        .and(hasDistribution(single()))));
 
         assertPlan(TestCase.CASE_22, checkPlan, disableRules);
         assertPlan(TestCase.CASE_22A, checkPlan, disableRules);
@@ -426,7 +458,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         Predicate<IgniteColocatedSortAggregate> checkPlan = 
isInstanceOf(IgniteColocatedSortAggregate.class)
                 .and(in -> hasAggregates(countMap).test(in.getAggCallList()))
                 .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(IgniteDistributions.single()))));
+                        .and(hasDistribution(single()))));
 
         assertPlan(TestCase.CASE_23, checkPlan, disableRules);
         assertPlan(TestCase.CASE_23A, checkPlan, disableRules);
@@ -500,6 +532,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(hasAggregate())
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 .and(input(isInstanceOf(IgniteSort.class)
                                         .and(input(isTableScan("TEST")))
                                 ))
@@ -550,6 +583,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(hasDistinctAggregate())
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 .and(input(isInstanceOf(IgniteSort.class)
                                         .and(input(isTableScan("TEST")))
                                 ))
@@ -574,6 +608,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
                         .and(hasAggregate())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
                         ))
                 ),
@@ -584,6 +619,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     private void checkAggWithColocatedGroupByIndexColumnsHash(TestCase 
testCase) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
                         
.and(nodeOrAnyChild(isInstanceOf(IgniteColocatedSortAggregate.class)
                                 .and(hasAggregate())
                                 .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
@@ -623,6 +659,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(not(hasAggregate()))
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
                         ))
                 ),
@@ -633,6 +670,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
     private void checkColocatedGroupWithNoAggregateUseIndexHash(TestCase 
testCase) throws Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
                         
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
                                 .and(not(hasAggregate()))
                                 .and(hasGroups())
@@ -649,6 +687,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(not(hasAggregate()))
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 .and(input(isInstanceOf(IgniteSort.class)
                                         .and(input(isTableScan("TEST")))
                                 ))
@@ -684,6 +723,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 isInstanceOf(IgniteColocatedSortAggregate.class)
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 .and(input(isInstanceOf(IgniteSort.class)
                                         .and(s -> 
s.collation().equals(collation))
                                         .and(input(isTableScan("TEST")))
@@ -713,6 +753,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(s -> 
s.collation().equals(TraitUtils.createCollation(List.of(0, 1, 2))))
                         
.and(input(isInstanceOf(IgniteColocatedSortAggregate.class)
                                 .and(input(isInstanceOf(IgniteExchange.class)
+                                        .and(hasDistribution(single()))
                                         
.and(input(isInstanceOf(IgniteSort.class)
                                                 .and(s -> 
s.collation().equals(TraitUtils.createCollation(List.of(0, 1))))
                                                 
.and(input(isTableScan("TEST")))
@@ -727,7 +768,7 @@ public class ColocatedSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase, isInstanceOf(IgniteColocatedSortAggregate.class)
                 
.and(hasNoGroupSets(IgniteColocatedSortAggregate::getGroupSets))
                 .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(hasDistribution(single()))
                         .and(input(isInstanceOf(IgniteTableScan.class))))
                 ), disableRules);
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
index aa6043f33c..7b92276159 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
@@ -77,14 +77,13 @@ public class IdentityDistributionPlannerTest extends 
AbstractPlannerTest {
                 + "from TEST_TBL1 t1 "
                 + "join TEST_TBL2 t2 on t1.id = t2.id";
 
-        assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+        assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
                 .and(hasDistribution(single()))
-                .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
-                        .and(hasDistribution(IgniteDistributions.identity(0)))
-                        .and(input(0, isInstanceOf(IgniteIndexScan.class)))
-                        .and(input(1, isInstanceOf(IgniteExchange.class)
-                                
.and(input(isInstanceOf(IgniteIndexScan.class)))
-                        ))
+                .and(input(0, isInstanceOf(IgniteExchange.class)
+                        .and(input(isInstanceOf(IgniteIndexScan.class)))
+                ))
+                .and(input(1, isInstanceOf(IgniteExchange.class)
+                        .and(input(isInstanceOf(IgniteIndexScan.class)))
                 ))
         ));
     }
@@ -105,14 +104,13 @@ public class IdentityDistributionPlannerTest extends 
AbstractPlannerTest {
                 + "from TEST_TBL1 t1 "
                 + "join TEST_TBL2 t2 on t1.id = t2.id";
 
-        assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+        assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
                 .and(hasDistribution(single()))
-                .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
-                        .and(hasDistribution(affinityDistribution))
-                        .and(input(0, isInstanceOf(IgniteExchange.class)
-                                
.and(input(isInstanceOf(IgniteIndexScan.class)))
-                        ))
-                        .and(input(1, isInstanceOf(IgniteIndexScan.class)))
+                .and(input(0, isInstanceOf(IgniteExchange.class)
+                        .and(input(isInstanceOf(IgniteIndexScan.class)))
+                ))
+                .and(input(1, isInstanceOf(IgniteExchange.class)
+                        .and(input(isInstanceOf(IgniteIndexScan.class)))
                 ))
         ));
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
index 0a20e4afe2..6d18730a3c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
@@ -17,14 +17,12 @@
 
 package org.apache.ignite.internal.sql.engine.planner;
 
-import static org.hamcrest.CoreMatchers.equalTo;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.hasSize;
 
-import java.util.List;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.ImmutableIntList;
@@ -32,7 +30,6 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
-import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -115,23 +112,19 @@ public class JoinColocationPlannerTest extends 
AbstractPlannerTest {
                 + "from COMPLEX_TBL t1 "
                 + "join SIMPLE_TBL t2 on t1.id1 = t2.id and t1.id2 = t2.id2";
 
-        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", 
"CorrelatedNestedLoopJoin");
-
-        IgniteMergeJoin join = findFirstNode(phys, 
byClass(IgniteMergeJoin.class));
-
-        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
-
-        assertThat(invalidPlanMsg, join, notNullValue());
-        assertThat(invalidPlanMsg, join.distribution().function().affinity(), 
is(true));
-
-        List<IgniteExchange> exchanges = findNodes(phys, node -> node 
instanceof IgniteExchange
-                && ((IgniteRel) node).distribution().function().affinity());
-
-        assertThat(invalidPlanMsg, exchanges, hasSize(1));
-        assertThat(invalidPlanMsg, exchanges.get(0).getInput(0), 
instanceOf(IgniteSort.class));
-        assertThat(invalidPlanMsg, exchanges.get(0).getInput(0).getInput(0), 
instanceOf(IgniteTableScan.class));
-        assertThat(invalidPlanMsg, exchanges.get(0).getInput(0).getInput(0)
-                .getTable().unwrap(IgniteTable.class), equalTo(simpleTbl));
+        assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                .and(hasDistribution(complexTbl.distribution()))
+                .and(input(0, isInstanceOf(IgniteIndexScan.class)
+                        .and(scan -> 
complexTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
+                ))
+                .and(input(1, isInstanceOf(IgniteSort.class)
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(input(isInstanceOf(IgniteTableScan.class)
+                                        .and(scan -> 
simpleTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
+                                ))
+                        ))
+                ))
+        ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
     }
 
     /**
@@ -157,22 +150,19 @@ public class JoinColocationPlannerTest extends 
AbstractPlannerTest {
                 + "from COMPLEX_TBL_DIRECT t1 "
                 + "join COMPLEX_TBL_INDIRECT t2 on t1.id1 = t2.id1 and t1.id2 
= t2.id2";
 
-        RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", 
"CorrelatedNestedLoopJoin");
-
-        IgniteMergeJoin join = findFirstNode(phys, 
byClass(IgniteMergeJoin.class));
-
-        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
-
-        assertThat(invalidPlanMsg, join, notNullValue());
-        assertThat(invalidPlanMsg, join.distribution().function().affinity(), 
is(true));
-
-        List<IgniteExchange> exchanges = findNodes(phys, node -> node 
instanceof IgniteExchange
-                && ((IgniteRel) node).distribution().function().affinity());
-
-        assertThat(invalidPlanMsg, exchanges, hasSize(1));
-        assertThat(invalidPlanMsg, exchanges.get(0).getInput(0), 
instanceOf(IgniteIndexScan.class));
-        assertThat(invalidPlanMsg, exchanges.get(0).getInput(0)
-                .getTable().unwrap(IgniteTable.class), 
equalTo(complexTblIndirect));
+        assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                .and(hasDistribution(single()))
+                .and(input(0, isInstanceOf(IgniteExchange.class)
+                        .and(input(isInstanceOf(IgniteIndexScan.class)
+                                .and(scan -> 
complexTblDirect.equals(scan.getTable().unwrap(IgniteTable.class)))
+                        ))
+                ))
+                .and(input(1, isInstanceOf(IgniteExchange.class)
+                        .and(input(isInstanceOf(IgniteIndexScan.class)
+                                .and(scan -> 
complexTblIndirect.equals(scan.getTable().unwrap(IgniteTable.class)))
+                        ))
+                ))
+        ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin");
     }
 
     private static IgniteTable simpleTable(String tableName, int size) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
index a006423f83..88543a92ee 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceSortAggregatePlannerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.planner;
 
 import static java.util.function.Predicate.not;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -30,14 +31,15 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.junit.jupiter.api.Test;
@@ -274,6 +276,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(TestCase.CASE_16A,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(input(isIndexScan("TEST", 
"idx_val0")))
                                 ))
@@ -284,6 +287,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(TestCase.CASE_16B,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(input(isIndexScan("TEST", 
"idx_val0")))
                                 ))
@@ -298,17 +302,55 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
      */
     @Test
     public void emptyCollationPassThroughLimit() throws Exception {
-        RuntimeException e = assertThrows(RuntimeException.class,
-                () -> assertPlan(TestCase.CASE_17, 
isInstanceOf(IgniteRel.class), disableRules));
-        assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
-
-        e = assertThrows(RuntimeException.class,
-                () -> assertPlan(TestCase.CASE_17A, 
isInstanceOf(IgniteRel.class), disableRules));
-        assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
+        assertPlan(TestCase.CASE_17,
+                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                        .and(input(1, 
isInstanceOf(IgniteReduceSortAggregate.class)
+                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                        
.and(input(isInstanceOf(IgniteLimit.class)
+                                                
.and(input(isInstanceOf(IgniteSort.class)
+                                                        
.and(input(isTableScan("TEST")))
+                                                ))
+                                        ))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
 
-        e = assertThrows(RuntimeException.class,
-                () -> assertPlan(TestCase.CASE_17B, 
isInstanceOf(IgniteRel.class), disableRules));
-        assertThat(e.getMessage(), containsString("There are not enough rules 
to produce a node with desired properties"));
+        assertPlan(TestCase.CASE_17A,
+                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                        .and(input(1, 
isInstanceOf(IgniteReduceSortAggregate.class)
+                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                        
.and(input(isInstanceOf(IgniteLimit.class)
+                                                
.and(input(isInstanceOf(IgniteExchange.class)
+                                                        
.and(hasDistribution(single()))
+                                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                                
.and(input(isTableScan("TEST")))
+                                                        ))
+                                                ))
+                                        ))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
+        assertPlan(TestCase.CASE_17B,
+                hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                        .and(input(1, 
isInstanceOf(IgniteReduceSortAggregate.class)
+                                
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
+                                        
.and(input(isInstanceOf(IgniteLimit.class)
+                                                
.and(input(isInstanceOf(IgniteExchange.class)
+                                                        
.and(hasDistribution(single()))
+                                                        
.and(input(isInstanceOf(IgniteSort.class)
+                                                                
.and(input(isTableScan("TEST")))
+                                                        ))
+                                                ))
+                                        ))
+                                ))
+                        ))
+                ),
+                disableRules
+        );
     }
 
     /**
@@ -393,6 +435,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                                 .and(not(hasAggregate()))
                                 .and(hasGroups())
                                 .and(input(isInstanceOf(IgniteExchange.class)
+                                        .and(hasDistribution(single()))
                                         
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                                 .and(not(hasAggregate()))
                                                 .and(hasGroups())
@@ -425,6 +468,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         Predicate<RelNode> nonColocated = 
hasChildThat(isInstanceOf(IgniteReduceSortAggregate.class)
                 .and(in -> 
hasAggregates(countReduce).test(in.getAggregateCalls()))
                 .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
                         .and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(in -> 
hasAggregates(countMap).test(in.getAggCallList()))
                                 )
@@ -469,7 +513,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 .and(hasCollation(RelCollations.of(0)))
                 .and(input(isInstanceOf(IgniteExchange.class)
                         .and(hasCollation(RelCollations.of(0)))
-                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(hasDistribution(single()))
                         .and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                 .and(hasCollation(RelCollations.of(1)))
                                 .and(hasGroupSets(Aggregate::getGroupSets, 1))
@@ -516,6 +560,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(hasAggregate())
                                         .and(hasGroups())
@@ -546,6 +591,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(hasAggregate())
                                         .and(hasGroups())
@@ -594,6 +640,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                                         .and(not(hasAggregate()))
                                         .and(hasGroups())
                                         
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
                                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                                         
.and(not(hasAggregate()))
                                                         
.and(input(isInstanceOf(IgniteSort.class)
@@ -641,6 +688,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                                         .and(not(hasAggregate()))
                                         .and(hasGroups())
                                         
.and(input(isInstanceOf(IgniteExchange.class)
+                                                .and(hasDistribution(single()))
                                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                                         
.and(not(hasAggregate()))
                                                         .and(hasGroups())
@@ -673,6 +721,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(hasAggregate())
                                         .and(input(isIndexScan("TEST", 
"idx_grp0_grp1")))
@@ -706,6 +755,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(not(hasAggregate()))
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(not(hasAggregate()))
                                         .and(hasGroups())
@@ -740,6 +790,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                         .and(not(hasAggregate()))
                         .and(hasGroups())
                         .and(input(isInstanceOf(IgniteExchange.class)
+                                .and(hasDistribution(single()))
                                 
.and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                         .and(not(hasAggregate()))
                                         .and(hasGroups())
@@ -773,6 +824,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                                 // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
                                 // Why can't Map be pushed down to under 
'exchange'.
                                 .and(input(isInstanceOf(IgniteExchange.class)
+                                        .and(hasDistribution(single()))
                                         
.and(input(isInstanceOf(IgniteSort.class)
                                                 .and(s -> 
s.collation().equals(collation))
                                                 
.and(input(isTableScan("TEST")))
@@ -814,6 +866,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                                                 // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20095
                                                 // Why can't Map be pushed 
down to under 'exchange'.
                                                 
.and(input(isInstanceOf(IgniteExchange.class)
+                                                        
.and(hasDistribution(single()))
                                                         
.and(input(isInstanceOf(IgniteSort.class)
                                                                 .and(s -> 
s.collation().equals(collation))
                                                                 
.and(input(isTableScan("TEST")
@@ -832,7 +885,7 @@ public class MapReduceSortAggregatePlannerTest extends 
AbstractAggregatePlannerT
                 .and(hasGroupSets(IgniteReduceSortAggregate::getGroupSets, 0))
                 .and(hasCollation(RelCollations.of(0)))
                 .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(hasDistribution(single()))
                         .and(hasCollation(RelCollations.of(0)))
                         .and(input(isInstanceOf(IgniteMapSortAggregate.class)
                                 .and(hasGroupSets(Aggregate::getGroupSets, 1))

Reply via email to