This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9464ca13212 IGNITE-18341 SQL Calcite: Introduce correlated
distribution - Fixes #10424.
9464ca13212 is described below
commit 9464ca13212de5dbcc72c6e1f9e2805e72d8157a
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Dec 13 13:24:27 2022 +0300
IGNITE-18341 SQL Calcite: Introduce correlated distribution - Fixes #10424.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../rel/IgniteCorrelatedNestedLoopJoin.java | 29 ++++++++
.../processors/query/calcite/rel/IgniteFilter.java | 83 ++++++++++++++++++++-
.../rel/agg/IgniteColocatedAggregateBase.java | 12 ++-
.../calcite/rel/set/IgniteColocatedSetOp.java | 6 +-
.../query/calcite/trait/DistributionFunction.java | 66 ++++++++++++++++
.../query/calcite/trait/DistributionTrait.java | 5 ++
.../query/calcite/trait/IgniteDistributions.java | 15 ++++
.../processors/query/calcite/trait/TraitUtils.java | 4 +
.../integration/CorrelatesIntegrationTest.java | 21 ++++++
.../planner/CorrelatedSubqueryPlannerTest.java | 87 ++++++++++++++++++++++
10 files changed, 321 insertions(+), 7 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
index b649a767e21..b71c7264b00 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -28,6 +29,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
@@ -40,6 +42,8 @@ import org.apache.calcite.util.Pair;
import
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -168,6 +172,31 @@ public class IgniteCorrelatedNestedLoopJoin extends
AbstractIgniteJoin {
ImmutableList.of(left.replace(rewindability),
right.replace(RewindabilityTrait.REWINDABLE)));
}
+ /** {@inheritDoc} */
+ @Override public List<Pair<RelTraitSet, List<RelTraitSet>>>
deriveDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inputTraits
+ ) {
+ List<Pair<RelTraitSet, List<RelTraitSet>>> traits =
super.deriveDistribution(nodeTraits, inputTraits);
+
+ RelTraitSet leftTraits = inputTraits.get(0);
+ RelTraitSet rightTraits = inputTraits.get(1);
+
+ IgniteDistribution leftDistr =
TraitUtils.distribution(inputTraits.get(0));
+
+ if (leftDistr.getType() == RelDistribution.Type.HASH_DISTRIBUTED &&
variablesSet.size() == 1) {
+ // Add artifitial correlated distribution which can be restored to
hash distribution by the filter node.
+ traits = new ArrayList<>(traits);
+
+ traits.add(Pair.of(nodeTraits.replace(leftDistr),
+ ImmutableList.of(
+ leftTraits,
+
rightTraits.replace(IgniteDistributions.correlated(F.first(variablesSet),
leftDistr)))));
+ }
+
+ return traits;
+ }
+
/** {@inheritDoc} */
@Override public RelOptCost computeSelfCost(RelOptPlanner planner,
RelMetadataQuery mq) {
IgniteCostFactory costFactory =
(IgniteCostFactory)planner.getCostFactory();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 9e7425be695..207a1841bfe 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -19,23 +19,40 @@ package
org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
import java.util.Set;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
+import org.apache.calcite.util.mapping.Mappings;
import
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import static
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
@@ -124,6 +141,70 @@ public class IgniteFilter extends Filter implements
TraitsAwareIgniteRel {
return
ImmutableList.of(Pair.of(nodeTraits.replace(CorrelationTrait.correlations(corrIds)),
inTraits));
}
+ /** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>>
passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ IgniteDistribution distribution = TraitUtils.distribution(nodeTraits);
+
+ if (distribution.function().correlated()) {
+ // Check if filter contains condition with required correlate.
+ DistributionFunction.CorrelatedDistribution func =
(DistributionFunction.CorrelatedDistribution)distribution.function();
+
+ RexVisitor<Void> visitor = new RexVisitorImpl<Void>(true) {
+ @Override public Void visitCorrelVariable(RexCorrelVariable
variable) {
+ if (variable.id.equals(func.correlationId()))
+ throw new Util.FoundOne(variable);
+
+ return null;
+ }
+ };
+
+ try {
+ condition.accept(visitor);
+ }
+ catch (Util.FoundOne corr) {
+ // Found required correlate.
+ IgniteDistribution corrDistr = func.target();
+
+ assert corrDistr.getType() ==
RelDistribution.Type.HASH_DISTRIBUTED;
+
+ // Remap correlate fields to input fields.
+ int corrFieldsCnt =
((RexNode)corr.getNode()).getType().getFieldCount();
+ int inputFieldsCnt = getRowType().getFieldCount();
+ Mapping mapping =
Mappings.create(MappingType.PARTIAL_FUNCTION, corrFieldsCnt, inputFieldsCnt);
+
+ List<RexNode> conds =
RelOptUtil.conjunctions(RexUtil.toCnf(getCluster().getRexBuilder(), condition));
+
+ for (RexNode cond : conds) {
+ if (cond instanceof RexCall &&
((RexCall)cond).getOperator().getKind() == SqlKind.EQUALS) {
+ RexNode left = ((RexCall)cond).getOperands().get(0);
+ RexNode right = ((RexCall)cond).getOperands().get(1);
+
+ RexInputRef inputRef = left instanceof RexInputRef ?
(RexInputRef)left :
+ right instanceof RexInputRef ? (RexInputRef)right
: null;
+
+ RexFieldAccess fieldAccess = left instanceof
RexFieldAccess ? (RexFieldAccess)left :
+ right instanceof RexFieldAccess ?
(RexFieldAccess)right : null;
+
+ if (inputRef != null && fieldAccess != null &&
+ fieldAccess.getReferenceExpr() instanceof
RexCorrelVariable &&
+
((RexCorrelVariable)fieldAccess.getReferenceExpr()).id.equals(func.correlationId())
+ )
+ mapping.set(fieldAccess.getField().getIndex(),
inputRef.getIndex());
+ }
+ }
+
+ IgniteDistribution inputDistr = corrDistr.apply(mapping);
+
+ // Found all keys in filter conditions, replace correlated
distribution with the real one.
+ if (inputDistr != IgniteDistributions.random())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t
-> t.replace(inputDistr)));
+ }
+ }
+
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t ->
t.replace(distribution)));
+ }
+
/** {@inheritDoc} */
@Override public RelOptCost computeSelfCost(RelOptPlanner planner,
RelMetadataQuery mq) {
double rowCount = mq.getRowCount(getInput());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
index 905a822a8e3..087cf0de57d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedAggregateBase.java
@@ -57,10 +57,14 @@ public abstract class IgniteColocatedAggregateBase extends
IgniteAggregate imple
}
/** {@inheritDoc} */
- @Override public Pair<RelTraitSet, List<RelTraitSet>>
passThroughDistribution(RelTraitSet nodeTraits,
- List<RelTraitSet> inTraits) {
- if (TraitUtils.distribution(nodeTraits) ==
IgniteDistributions.single())
- return Pair.of(nodeTraits, Commons.transform(inTraits, t ->
t.replace(IgniteDistributions.single())));
+ @Override public Pair<RelTraitSet, List<RelTraitSet>>
passThroughDistribution(
+ RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits
+ ) {
+ IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+ if (distr == IgniteDistributions.single() ||
distr.function().correlated())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t ->
t.replace(distr)));
return null;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
index 362fdc5d4dc..b0bfce5f67e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteColocatedSetOp.java
@@ -56,8 +56,10 @@ public interface IgniteColocatedSetOp extends IgniteSetOp {
/** {@inheritDoc} */
@Override public default Pair<RelTraitSet, List<RelTraitSet>>
passThroughDistribution(RelTraitSet nodeTraits,
List<RelTraitSet> inTraits) {
- if (TraitUtils.distribution(nodeTraits) ==
IgniteDistributions.single())
- return Pair.of(nodeTraits, Commons.transform(inTraits, t ->
t.replace(IgniteDistributions.single())));
+ IgniteDistribution distr = TraitUtils.distribution(nodeTraits);
+
+ if (distr == IgniteDistributions.single() ||
distr.function().correlated())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t ->
t.replace(distr)));
return null;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index 2d25b298adb..4191ef9083e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.UUID;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.util.ImmutableIntList;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import
org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
@@ -63,6 +64,11 @@ public abstract class DistributionFunction {
return false;
}
+ /** */
+ public boolean correlated() {
+ return false;
+ }
+
/** */
public int cacheId() {
return CU.UNDEFINED_CACHE_ID;
@@ -141,6 +147,11 @@ public abstract class DistributionFunction {
return new AffinityDistribution(cacheId, identity);
}
+ /** */
+ public static DistributionFunction correlated(CorrelationId corrId,
IgniteDistribution delegate) {
+ return new CorrelatedDistribution(corrId, delegate);
+ }
+
/** */
public static boolean satisfy(DistributionFunction f0,
DistributionFunction f1) {
if (f0 == f1 || f0.name() == f1.name())
@@ -316,4 +327,59 @@ public abstract class DistributionFunction {
return "affinity[identity=" + identity + ", cacheId=" + cacheId +
']';
}
}
+
+ /**
+ * Correlated distribution, used to bypass set of nodes on the right hand
of CNLJ and to be restored to
+ * original hash distribution (with remapped keys) by the filter node.
+ */
+ public static final class CorrelatedDistribution extends
DistributionFunction {
+ /** */
+ private final CorrelationId corrId;
+
+ /** */
+ private final IgniteDistribution target;
+
+ /** */
+ private CorrelatedDistribution(CorrelationId corrId,
IgniteDistribution target) {
+ this.corrId = corrId;
+ this.target = target;
+
+ assert target.getType() == RelDistribution.Type.HASH_DISTRIBUTED :
target.getType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDistribution.Type type() {
+ return RelDistribution.Type.RANDOM_DISTRIBUTED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Destination<Row> destination(
+ ExecutionContext<Row> ctx,
+ AffinityService affSrvc,
+ ColocationGroup target,
+ ImmutableIntList keys
+ ) {
+ throw new AssertionError("Correlated distribution should be
converted to delegate before using");
+ }
+
+ /** */
+ public CorrelationId correlationId() {
+ return corrId;
+ }
+
+ /** */
+ public IgniteDistribution target() {
+ return target;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean correlated() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String name0() {
+ return "correlated[corrId=" + corrId + ", target=" + target + ']';
+ }
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 9e16870cd15..e0d7dc6443e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -136,6 +136,11 @@ public final class DistributionTrait implements
IgniteDistribution {
if (other.getType() == ANY)
return true;
+ if (function.correlated() || other.function.correlated()) {
+ return function.correlated() && other.function.correlated() &&
+ DistributionFunction.satisfy(function, other.function);
+ }
+
if (getType() == other.getType())
return getType() != HASH_DISTRIBUTED
|| (Objects.equals(keys, other.keys)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 8af167b083b..7209cfaccc9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.trait;
import java.util.List;
import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -113,6 +114,20 @@ public class IgniteDistributions {
return canonize(new DistributionTrait(ImmutableIntList.copyOf(keys),
function));
}
+ /**
+ * Creates correlated distribution, thats used to bypass set of nodes on
the right hand of CNLJ in cases when
+ * hash distribution cannot bypass these nodes.
+ * It's an proxy for hash distribution. Nodes can't be enforced to this
distribution. Original hash distribution
+ * (with remapped keys) thats can be used in final plan is restored by the
filter node.
+ *
+ * @param corrId Target distribution correlation id.
+ * @param target Target distribution.
+ * @return Distribution by correlate.
+ */
+ public static IgniteDistribution correlated(CorrelationId corrId,
IgniteDistribution target) {
+ return canonize(new
DistributionTrait(DistributionFunction.correlated(corrId, target)));
+ }
+
/**
* See {@link RelTraitDef#canonize(org.apache.calcite.plan.RelTrait)}.
*/
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index bc9a0a81005..408a5523230 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -145,6 +145,10 @@ public class TraitUtils {
if (fromTrait.satisfies(toTrait))
return rel;
+ // Cannot enforce node to correlated distribution, this distribution
is only set by trait propagation.
+ if (toTrait.function().correlated())
+ return null;
+
// right now we cannot create a multi-column affinity
// key object, thus this conversion is impossible
if (toTrait.function().affinity() && toTrait.getKeys().size() > 1)
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
index f8808a3ff44..3e144df2800 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CorrelatesIntegrationTest.java
@@ -76,4 +76,25 @@ public class CorrelatesIntegrationTest extends
AbstractBasicIntegrationTest {
.returns(12, 2)
.check();
}
+
+ /**
+ * Tests colocated join possible with the help of correlated distribution.
+ */
+ @Test
+ public void testCorrelatedDistribution() {
+ sql("CREATE TABLE dept(deptid INTEGER, name VARCHAR, PRIMARY
KEY(deptid))");
+ sql("CREATE TABLE emp(empid INTEGER, deptid INTEGER, name VARCHAR,
PRIMARY KEY(empid, deptid)) " +
+ "WITH AFFINITY_KEY=deptid");
+
+ sql("INSERT INTO dept VALUES (0, 'dept0'), (1, 'dept1'), (2,
'dept2')");
+ sql("INSERT INTO emp VALUES (0, 0, 'emp0'), (1, 0, 'emp1'), (2, 0,
'emp2'), " +
+ "(3, 2, 'emp3'), (4, 2, 'emp4'), (5, 3, 'emp5')");
+
+ assertQuery("SELECT deptid, (SELECT COUNT(*) FROM emp WHERE emp.deptid
= dept.deptid) FROM dept")
+ .matches(QueryChecker.containsSubPlan("IgniteColocated"))
+ .returns(0, 3L)
+ .returns(1, 0L)
+ .returns(2, 2L)
+ .check();
+ }
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
index 98c16c7dfaa..6c998501ce3 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.query.calcite.planner;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
@@ -34,8 +36,11 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner
import
org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
import
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteColocatedAggregateBase;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
@@ -225,6 +230,88 @@ public class CorrelatedSubqueryPlannerTest extends
AbstractPlannerTest {
}
}
+ /**
+ * Test correlated distribution bypass set of nodes.
+ */
+ @Test
+ public void testCorrelatedDistribution() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("TA1", IgniteDistributions.affinity(0, "cache1",
"hash"),
+ "A", Integer.class, "B", Integer.class, "C", Integer.class),
+ createTable("TA2", IgniteDistributions.affinity(1, "cache2",
"hash"),
+ "A", Integer.class, "B", Integer.class, "C", Integer.class),
+ createTable("TH1", IgniteDistributions.hash(Arrays.asList(0, 1)),
+ "A", Integer.class, "B", Integer.class, "C", Integer.class),
+ createTable("TH2", IgniteDistributions.hash(Arrays.asList(1, 2)),
+ "A", Integer.class, "B", Integer.class, "C", Integer.class),
+ createTable("TH3", IgniteDistributions.hash(Arrays.asList(0, 2)),
+ "A", Integer.class, "B", Integer.class, "C", Integer.class)
+ );
+
+ Predicate<RelNode> colocatedPredicate =
hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(0, isInstanceOf(IgniteTableScan.class)))
+ .and(input(1, isInstanceOf(IgniteColocatedAggregateBase.class)
+ .and(hasChildThat(isInstanceOf(IgniteExchange.class)).negate())
+ )));
+
+ // Affinity distribution.
+ String sql = "SELECT a FROM ta1 WHERE EXISTS (SELECT a FROM ta2 WHERE
ta2.b = ta1.a)";
+ assertPlan(sql, schema, colocatedPredicate);
+
+ // Hash distribution on two columns.
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b =
th1.a AND th2.c = th1.b)";
+ assertPlan(sql, schema, colocatedPredicate);
+
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th3 WHERE th3.a =
th1.a AND th3.c = th1.b)";
+ assertPlan(sql, schema, colocatedPredicate);
+
+ // Additional AND condition in filter.
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b =
th1.a AND th2.c = th1.b AND th2.a = 1)";
+ assertPlan(sql, schema, colocatedPredicate);
+
+ // Aggregate with affinity distribution.
+ sql = "SELECT (SELECT sum(a) FROM ta2 WHERE ta2.b = ta1.a) FROM ta1";
+ assertPlan(sql, schema, colocatedPredicate);
+
+ // Aggregate with set op with hash distribution on two columns.
+ sql = "SELECT (SELECT sum(a) FROM (" +
+ " SELECT a FROM th2 WHERE th2.b = th1.a AND th2.c = th1.b " +
+ " INTERSECT " +
+ " SELECT a FROM th3 WHERE th3.a = th1.a AND th3.c = th1.b" +
+ ")) FROM th1";
+ assertPlan(sql, schema, colocatedPredicate);
+
+ // Condition on not colocated column.
+ sql = "SELECT a FROM ta1 WHERE EXISTS (SELECT a FROM ta2 WHERE ta2.a =
ta1.a)";
+ assertPlan(sql, schema, colocatedPredicate.negate());
+
+ // Additional OR condition in filter.
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b =
th1.a AND th2.c = th1.b OR th2.a = 1)";
+ assertPlan(sql, schema, colocatedPredicate.negate());
+
+ // Not full set of hash distribution keys.
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.a =
th1.a AND th2.c = th1.b)";
+ assertPlan(sql, schema, colocatedPredicate.negate());
+
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.b =
1 AND th2.c = th1.b)";
+ assertPlan(sql, schema, colocatedPredicate.negate());
+
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th3 WHERE th3.a =
th1.a AND th3.c = th1.c)";
+ assertPlan(sql, schema, colocatedPredicate.negate());
+
+ // Wrong order of hash distribution keys.
+ sql = "SELECT a FROM th1 WHERE EXISTS (SELECT a FROM th2 WHERE th2.c =
th1.a AND th2.b = th1.b)";
+ assertPlan(sql, schema, colocatedPredicate.negate());
+
+ // One input of set-op has not full set of hash distribution keys.
+ sql = "SELECT (SELECT sum(a) FROM (" +
+ " SELECT a FROM th2 WHERE th2.a = th1.a AND th2.c = th1.b " +
+ " INTERSECT " +
+ " SELECT a FROM th3 WHERE th3.a = th1.a AND th3.c = th1.b" +
+ ")) FROM th1";
+ assertPlan(sql, schema, colocatedPredicate.negate());
+ }
+
/** */
private RelNode convertSubQueries(IgnitePlanner planner, PlanningContext
ctx) throws Exception {
// Parse and validate.