This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9464ca13212 IGNITE-18341 SQL Calcite: Introduce correlated 
distribution - Fixes #10424.
9464ca13212 is described below

commit 9464ca13212de5dbcc72c6e1f9e2805e72d8157a
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Dec 13 13:24:27 2022 +0300

    IGNITE-18341 SQL Calcite: Introduce correlated distribution - Fixes #10424.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../rel/IgniteCorrelatedNestedLoopJoin.java        | 29 ++++++++
 .../processors/query/calcite/rel/IgniteFilter.java | 83 ++++++++++++++++++++-
 .../rel/agg/IgniteColocatedAggregateBase.java      | 12 ++-
 .../calcite/rel/set/IgniteColocatedSetOp.java      |  6 +-
 .../query/calcite/trait/DistributionFunction.java  | 66 ++++++++++++++++
 .../query/calcite/trait/DistributionTrait.java     |  5 ++
 .../query/calcite/trait/IgniteDistributions.java   | 15 ++++
 .../processors/query/calcite/trait/TraitUtils.java |  4 +
 .../integration/CorrelatesIntegrationTest.java     | 21 ++++++
 .../planner/CorrelatedSubqueryPlannerTest.java     | 87 ++++++++++++++++++++++
 10 files changed, 321 insertions(+), 7 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
index b649a767e21..b71c7264b00 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -28,6 +29,7 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
@@ -40,6 +42,8 @@ import org.apache.calcite.util.Pair;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -168,6 +172,31 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
             ImmutableList.of(left.replace(rewindability), 
right.replace(RewindabilityTrait.REWINDABLE)));
     }
 
+    /** {@inheritDoc} */
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        List<Pair<RelTraitSet, List<RelTraitSet>>> traits = 
super.deriveDistribution(nodeTraits, inputTraits);
+
+        RelTraitSet leftTraits = inputTraits.get(0);
+        RelTraitSet rightTraits = inputTraits.get(1);
+
+        IgniteDistribution leftDistr = 
TraitUtils.distribution(inputTraits.get(0));
+
+        if (leftDistr.getType() == RelDistribution.Type.HASH_DISTRIBUTED && 
variablesSet.size() == 1) {
+            // Add artifitial correlated distribution which can be restored to 
hash distribution by the filter node.
+            traits = new ArrayList<>(traits);
+
+            traits.add(Pair.of(nodeTraits.replace(leftDistr),
+                ImmutableList.of(
+                    leftTraits,
+                    
rightTraits.replace(IgniteDistributions.correlated(F.first(variablesSet), 
leftDistr)))));
+        }
+
+        return traits;
+    }
+
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
         IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 9e7425be695..207a1841bfe 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -19,23 +19,40 @@ package 
org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
 import java.util.Set;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
+import org.apache.calcite.util.mapping.Mappings;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
@@ -124,6 +141,70 @@ public class IgniteFilter extends Filter implements 
TraitsAwareIgniteRel {
         return 
ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)),
 inTraits));
     }
 
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits) {
+        IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
+
+        if (distribution.function().correlated()) {
+            // Check if filter contains condition with required correlate.
+            DistributionFunction.CorrelatedDistribution func = 
(DistributionFunction.CorrelatedDistribution)distribution.function();
+
+            RexVisitor<Void> visitor = new RexVisitorImpl<Void>(true) {
+                @Override public Void visitCorrelVariable(RexCorrelVariable 
variable) {
+                    if (variable.id.equals(func.correlationId()))
+                        throw new Util.FoundOne(variable);
+
+                    return null;
+                }
+            };
+
+            try {
+                condition.accept(visitor);
+            }
+            catch (Util.FoundOne corr) {
+                // Found required correlate.
+                IgniteDistribution corrDistr = func.target();
+
+                assert corrDistr.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED;
+
+                // Remap correlate fields to input fields.
+                int corrFieldsCnt = 
((RexNode)corr.getNode()).getType().getFieldCount();
+                int inputFieldsCnt = getRowType().getFieldCount();
+                Mapping mapping = 
Mappings.create(MappingType.PARTIAL_FUNCTION, corrFieldsCnt, inputFieldsCnt);
+
+                List<RexNode> conds = 
RelOptUtil.conjunctions(RexUtil.toCnf(getCluster().getRexBuilder(), condition));
+
+                for (RexNode cond : conds) {
+                    if (cond instanceof RexCall && 
((RexCall)cond).getOperator().getKind() == SqlKind.EQUALS) {
+                        RexNode left = ((RexCall)cond).getOperands().get(0);
+                        RexNode right = ((RexCall)cond).getOperands().get(1);
+
+                        RexInputRef inputRef = left instanceof RexInputRef ? 
(RexInputRef)left :
+                            right instanceof RexInputRef ? (RexInputRef)right 
: null;
+
+                        RexFieldAccess fieldAccess = left instanceof 
RexFieldAccess ? (RexFieldAccess)left :
+                            right instanceof RexFieldAccess ? 
(RexFieldAccess)right : null;
+
+                        if (inputRef != null && fieldAccess != null &&
+                            fieldAccess.getReferenceExpr() instanceof 
RexCorrelVariable &&
+                            
((RexCorrelVariable)fieldAccess.getReferenceExpr()).id.equals(func.correlationId())
+                        )
+                            mapping.set(fieldAccess.getField().getIndex(), 
inputRef.getIndex());
+                    }
+                }
+
+                IgniteDistribution inputDistr = corrDistr.apply(mapping);
+
+                // Found all keys in filter conditions, replace correlated 
distribution with the real one.
+                if (inputDistr != IgniteDistributions.random())
+                    return Pair.of(nodeTraits, Commons.transform(inTraits, t 
-> t.replace(inputDistr)));
+            }
+        }
+
+        return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(distribution)));
+    }
+
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
         double rowCount = mq.getRowCount(getInput());
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
index 905a822a8e3..087cf0de57d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
@@ -57,10 +57,14 @@ public abstract class IgniteColocatedAggregateBase extends 
IgniteAggregate imple
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits,
-        List<RelTraitSet> inTraits) {
-        if (TraitUtils.distribution(nodeTraits) == 
IgniteDistributions.single())
-            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(IgniteDistributions.single())));
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inTraits
+    ) {
+        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+        if (distr == IgniteDistributions.single() || 
distr.function().correlated())
+            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(distr)));
 
         return null;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
index 362fdc5d4dc..b0bfce5f67e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
@@ -56,8 +56,10 @@ public interface IgniteColocatedSetOp extends IgniteSetOp {
     /** {@inheritDoc} */
     @Override public default Pair<RelTraitSet, List<RelTraitSet>> 
passThroughDistribution(RelTraitSet nodeTraits,
         List<RelTraitSet> inTraits) {
-        if (TraitUtils.distribution(nodeTraits) == 
IgniteDistributions.single())
-            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(IgniteDistributions.single())));
+        IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+        if (distr == IgniteDistributions.single() || 
distr.function().correlated())
+            return Pair.of(nodeTraits, Commons.transform(inTraits, t -> 
t.replace(distr)));
 
         return null;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index 2d25b298adb..4191ef9083e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 import java.util.UUID;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.util.ImmutableIntList;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
@@ -63,6 +64,11 @@ public abstract class DistributionFunction {
         return false;
     }
 
+    /** */
+    public boolean correlated() {
+        return false;
+    }
+
     /** */
     public int cacheId() {
         return CU.UNDEFINED_CACHE_ID;
@@ -141,6 +147,11 @@ public abstract class DistributionFunction {
         return new AffinityDistribution(cacheId, identity);
     }
 
+    /** */
+    public static DistributionFunction correlated(CorrelationId corrId, 
IgniteDistribution delegate) {
+        return new CorrelatedDistribution(corrId, delegate);
+    }
+
     /** */
     public static boolean satisfy(DistributionFunction f0, 
DistributionFunction f1) {
         if (f0 == f1 || f0.name() == f1.name())
@@ -316,4 +327,59 @@ public abstract class DistributionFunction {
             return "affinity[identity=" + identity + ", cacheId=" + cacheId + 
']';
         }
     }
+
+    /**
+     * Correlated distribution, used to bypass set of nodes on the right hand 
of CNLJ and to be restored to
+     * original hash distribution (with remapped keys) by the filter node.
+     */
+    public static final class CorrelatedDistribution extends 
DistributionFunction {
+        /** */
+        private final CorrelationId corrId;
+
+        /** */
+        private final IgniteDistribution target;
+
+        /** */
+        private CorrelatedDistribution(CorrelationId corrId, 
IgniteDistribution target) {
+            this.corrId = corrId;
+            this.target = target;
+
+            assert target.getType() == RelDistribution.Type.HASH_DISTRIBUTED : 
target.getType();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDistribution.Type type() {
+            return RelDistribution.Type.RANDOM_DISTRIBUTED;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> Destination<Row> destination(
+            ExecutionContext<Row> ctx,
+            AffinityService affSrvc,
+            ColocationGroup target,
+            ImmutableIntList keys
+        ) {
+            throw new AssertionError("Correlated distribution should be 
converted to delegate before using");
+        }
+
+        /** */
+        public CorrelationId correlationId() {
+            return corrId;
+        }
+
+        /** */
+        public IgniteDistribution target() {
+            return target;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean correlated() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected String name0() {
+            return "correlated[corrId=" + corrId + ", target=" + target + ']';
+        }
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 9e16870cd15..e0d7dc6443e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -136,6 +136,11 @@ public final class DistributionTrait implements 
IgniteDistribution {
         if (other.getType() == ANY)
             return true;
 
+        if (function.correlated() || other.function.correlated()) {
+            return function.correlated() && other.function.correlated() &&
+                DistributionFunction.satisfy(function, other.function);
+        }
+
         if (getType() == other.getType())
             return getType() != HASH_DISTRIBUTED
                 || (Objects.equals(keys, other.keys)
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 8af167b083b..7209cfaccc9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.util.List;
 import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
@@ -113,6 +114,20 @@ public class IgniteDistributions {
         return canonize(new DistributionTrait(ImmutableIntList.copyOf(keys), 
function));
     }
 
+    /**
+     * Creates correlated distribution, thats used to bypass set of nodes on 
the right hand of CNLJ in cases when
+     * hash distribution cannot bypass these nodes.
+     * It's an proxy for hash distribution. Nodes can't be enforced to this 
distribution. Original hash distribution
+     * (with remapped keys) thats can be used in final plan is restored by the 
filter node.
+     *
+     * @param corrId Target distribution correlation id.
+     * @param target Target distribution.
+     * @return Distribution by correlate.
+     */
+    public static IgniteDistribution correlated(CorrelationId corrId, 
IgniteDistribution target) {
+        return canonize(new 
DistributionTrait(DistributionFunction.correlated(corrId, target)));
+    }
+
     /**
      * See {@link RelTraitDef#canonize(org.apache.calcite.plan.RelTrait)}.
      */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index bc9a0a81005..408a5523230 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -145,6 +145,10 @@ public class TraitUtils {
         if (fromTrait.satisfies(toTrait))
             return rel;
 
+        // Cannot enforce node to correlated distribution, this distribution 
is only set by trait propagation.
+        if (toTrait.function().correlated())
+            return null;
+
         // right now we cannot create a multi-column affinity
         // key object, thus this conversion is impossible
         if (toTrait.function().affinity() && toTrait.getKeys().size() > 1)
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
index f8808a3ff44..3e144df2800 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
@@ -76,4 +76,25 @@ public class CorrelatesIntegrationTest extends 
AbstractBasicIntegrationTest {
             .returns(12, 2)
             .check();
     }
+
+    /**
+     * Tests colocated join possible with the help of correlated distribution.
+     */
+    @Test
+    public void testCorrelatedDistribution() {
+        sql("CREATE TABLE dept(deptid INTEGER, name VARCHAR, PRIMARY 
KEY(deptid))");
+        sql("CREATE TABLE emp(empid INTEGER, deptid INTEGER, name VARCHAR, 
PRIMARY KEY(empid, deptid)) " +
+            "WITH AFFINITY_KEY=deptid");
+
+        sql("INSERT INTO dept VALUES (0, 'dept0'), (1, 'dept1'), (2, 
'dept2')");
+        sql("INSERT INTO emp VALUES (0, 0, 'emp0'), (1, 0, 'emp1'), (2, 0, 
'emp2'), " +
+            "(3, 2, 'emp3'), (4, 2, 'emp4'), (5, 3, 'emp5')");
+
+        assertQuery("SELECT deptid, (SELECT COUNT(*) FROM emp WHERE emp.deptid 
= dept.deptid) FROM dept")
+            .matches(QueryChecker.containsSubPlan("IgniteColocated"))
+            .returns(0, 3L)
+            .returns(1, 0L)
+            .returns(2, 2L)
+            .check();
+    }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
index 98c16c7dfaa..6c998501ce3 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.calcite.planner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Predicate;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
@@ -34,8 +36,11 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteColocatedAggregateBase;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
@@ -225,6 +230,88 @@ public class CorrelatedSubqueryPlannerTest extends 
AbstractPlannerTest {
         }
     }
 
+    /**
+     * Test correlated distribution bypass set of nodes.
+     */
+    @Test
+    public void testCorrelatedDistribution() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TA1", IgniteDistributions.affinity(0, "cache1", 
"hash"),
+                "A", Integer.class, "B", Integer.class, "C", Integer.class),
+            createTable("TA2", IgniteDistributions.affinity(1, "cache2", 
"hash"),
+                "A", Integer.class, "B", Integer.class, "C", Integer.class),
+            createTable("TH1", IgniteDistributions.hash(Arrays.asList(0, 1)),
+                "A", Integer.class, "B", Integer.class, "C", Integer.class),
+            createTable("TH2", IgniteDistributions.hash(Arrays.asList(1, 2)),
+                "A", Integer.class, "B", Integer.class, "C", Integer.class),
+            createTable("TH3", IgniteDistributions.hash(Arrays.asList(0, 2)),
+                "A", Integer.class, "B", Integer.class, "C", Integer.class)
+        );
+
+        Predicate<RelNode> colocatedPredicate = 
hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+            .and(input(0, isInstanceOf(IgniteTableScan.class)))
+            .and(input(1, isInstanceOf(IgniteColocatedAggregateBase.class)
+                .and(hasChildThat(isInstanceOf(IgniteExchange.class)).negate())
+            )));
+
+        // Affinity distribution.
+        String sql = "SELECT a FROM ta1 WHERE EXISTS (SELECT a FROM ta2 WHERE 
ta2.b = ta1.a)";
+        assertPlan(sql, schema, colocatedPredicate);
+
+        // Hash distribution on two columns.
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b = 
th1.a AND th2.c = th1.b)";
+        assertPlan(sql, schema, colocatedPredicate);
+
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th3 WHERE th3.a = 
th1.a AND th3.c = th1.b)";
+        assertPlan(sql, schema, colocatedPredicate);
+
+        // Additional AND condition in filter.
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b = 
th1.a AND th2.c = th1.b AND th2.a = 1)";
+        assertPlan(sql, schema, colocatedPredicate);
+
+        // Aggregate with affinity distribution.
+        sql = "SELECT (SELECT sum(a) FROM ta2 WHERE ta2.b = ta1.a) FROM ta1";
+        assertPlan(sql, schema, colocatedPredicate);
+
+        // Aggregate with set op with hash distribution on two columns.
+        sql = "SELECT (SELECT sum(a) FROM (" +
+            "   SELECT a FROM th2 WHERE th2.b = th1.a AND th2.c = th1.b " +
+            "   INTERSECT " +
+            "   SELECT a FROM th3 WHERE th3.a = th1.a AND th3.c = th1.b" +
+            ")) FROM th1";
+        assertPlan(sql, schema, colocatedPredicate);
+
+        // Condition on not colocated column.
+        sql = "SELECT a FROM ta1 WHERE EXISTS (SELECT a FROM ta2 WHERE ta2.a = 
ta1.a)";
+        assertPlan(sql, schema, colocatedPredicate.negate());
+
+        // Additional OR condition in filter.
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b = 
th1.a AND th2.c = th1.b OR th2.a = 1)";
+        assertPlan(sql, schema, colocatedPredicate.negate());
+
+        // Not full set of hash distribution keys.
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.a = 
th1.a AND th2.c = th1.b)";
+        assertPlan(sql, schema, colocatedPredicate.negate());
+
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b = 
1 AND th2.c = th1.b)";
+        assertPlan(sql, schema, colocatedPredicate.negate());
+
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th3 WHERE th3.a = 
th1.a AND th3.c = th1.c)";
+        assertPlan(sql, schema, colocatedPredicate.negate());
+
+        // Wrong order of hash distribution keys.
+        sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.c = 
th1.a AND th2.b = th1.b)";
+        assertPlan(sql, schema, colocatedPredicate.negate());
+
+        // One input of set-op has not full set of hash distribution keys.
+        sql = "SELECT (SELECT sum(a) FROM (" +
+            "   SELECT a FROM th2 WHERE th2.a = th1.a AND th2.c = th1.b " +
+            "   INTERSECT " +
+            "   SELECT a FROM th3 WHERE th3.a = th1.a AND th3.c = th1.b" +
+            ")) FROM th1";
+        assertPlan(sql, schema, colocatedPredicate.negate());
+    }
+
     /** */
     private RelNode convertSubQueries(IgnitePlanner planner, PlanningContext 
ctx) throws Exception {
         // Parse and validate.

Reply via email to