This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 15ebaffa3 IGNITE-16761 Sql. Join with correlated subqueries returns
wrong result - Fixes #756.
15ebaffa3 is described below
commit 15ebaffa3c3e0cade2364beef739a848dfc749fd
Author: zstan <[email protected]>
AuthorDate: Wed Apr 6 17:20:54 2022 +0300
IGNITE-16761 Sql. Join with correlated subqueries returns wrong result -
Fixes #756.
Signed-off-by: zstan <[email protected]>
---
.../internal/sql/engine/ItCorrelatesTest.java | 25 ++++
.../internal/sql/engine/prepare/IgnitePlanner.java | 134 +++++++++++++++++
.../internal/sql/engine/prepare/PlannerHelper.java | 2 +
.../planner/CorrelatedSubqueryPlannerTest.java | 161 +++++++++++++++++++++
4 files changed, 322 insertions(+)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index 0da0c503a..0a7475d95 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -74,4 +74,29 @@ public class ItCorrelatesTest extends
AbstractBasicIntegrationTest {
.returns(4)
.check();
}
+
+ /**
+ * Tests resolving of collisions in correlates.
+ */
+ @Test
+ public void testCorrelatesCollision() {
+ sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
+ sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)");
+ sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
+ sql("INSERT INTO test2 VALUES (11, 1), (12, 1), (13, 4)");
+
+ // Collision by correlate variables in the left hand.
+ assertQuery("SELECT * FROM test1 WHERE "
+ + "EXISTS(SELECT * FROM test2 WHERE test1.a=test2.a AND
test1.b<>test2.c) "
+ + "AND NOT EXISTS(SELECT * FROM test2 WHERE test1.a=test2.a
AND test1.b<test2.c)")
+ .returns(12, 2)
+ .check();
+
+ // Collision by correlate variables in both, left and right hands.
+ assertQuery("SELECT * FROM test1 WHERE "
+ + "EXISTS(SELECT * FROM test2 WHERE (SELECT test1.a)=test2.a
AND (SELECT test1.b)<>test2.c) "
+ + "AND NOT EXISTS(SELECT * FROM test2 WHERE (SELECT
test1.a)=test2.a AND (SELECT test1.b)<test2.c)")
+ .returns(12, 2)
+ .check();
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
index ce0ffb00b..b1695d071 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
@@ -21,7 +21,10 @@ import java.io.PrintWriter;
import java.io.Reader;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
@@ -37,13 +40,21 @@ import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexExecutor;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
@@ -342,6 +353,129 @@ public class IgnitePlanner implements Planner,
RelOptTable.ViewExpander {
return root.withRel(converter.trimUnusedFields(dml || ordered,
root.rel));
}
+ /**
+ * When rewriting sub-queries to {@code LogicalCorrelate} instances,
correlate nodes with the same correlation ids
+ * can be created (if there was more then one sub-query using the same
correlate table). It's not a problem, when
+ * rows are processed one by one (like in the enumerable convension), but
Ignite execution nodes process batches
+ * of rows, and execution nodes in some cases can get unexpected values
for correlated variables.
+ *
+ * <p>This method replaces collisions by variables in correlates. For the
left hand of LogicalCorrelate duplicated
+ * correlated variable and it's usages replaced with the new one. For
example:
+ *
+ * <p>LogicalCorrelate(correlation=[$cor0])
LogicalCorrelate(correlation=[$cor0])
+ * LogicalCorrelate(correlation=[$cor0]) transforms to
LogicalCorrelate(correlation=[$cor1])
+ * ... condition=[=($cor0.A, $0)] ... ...
condition=[=($cor1.A, $0)] ...
+ * ... condition=[=($cor0.A, $0)] ... ...
condition=[=($cor0.A, $0)] ...
+ *
+ * <p>For the right hand of LogicalCorrelate duplicated LogicalCorrelate
is just replaced with regular join.
+ * For example:
+ *
+ * <p>LogicalCorrelate(correlation=[$cor0])
LogicalCorrelate(correlation=[$cor0])
+ * ... transforms to ...
+ * LogicalCorrelate(correlation=[$cor0])
LogicalJoin(condition=true)
+ *
+ * @param rel Relational expression tree.
+ * @return Relational expression without collisions in correlates.
+ */
+ public RelNode replaceCorrelatesCollisions(RelNode rel) {
+ RelShuttle relShuttle = new RelHomogeneousShuttle() {
+ /** Set of used correlates. */
+ private final Set<CorrelationId> usedSet = new HashSet<>();
+
+ /** Map to find correlates, that should be replaced (in the left
hand of correlate). */
+ private final Map<CorrelationId, CorrelationId> replaceMap = new
HashMap<>();
+
+ /** Multiset to find correlates, that should be removed (in the
right hand of correlate). */
+ private final Map<CorrelationId, Integer> removeMap = new
HashMap<>();
+
+ private final RexShuttle rexShuttle = new RexShuttle() {
+ @Override public RexNode visitCorrelVariable(RexCorrelVariable
variable) {
+ CorrelationId newCorId = replaceMap.get(variable.id);
+
+ if (newCorId != null) {
+ return
cluster().getRexBuilder().makeCorrel(variable.getType(), newCorId);
+ } else {
+ return variable;
+ }
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override public RelNode visit(LogicalCorrelate correlate) {
+ CorrelationId corId = correlate.getCorrelationId();
+
+ if (usedSet.contains(corId)) {
+ if (removeMap.containsKey(corId)) {
+ // We are in the right hand of correlate by corId:
replace correlate with join.
+ RelNode join = LogicalJoin.create(
+ correlate.getLeft(),
+ correlate.getRight(),
+ List.of(),
+ cluster().getRexBuilder().makeLiteral(true),
+ Set.of(),
+ correlate.getJoinType()
+ );
+
+ return super.visit(join);
+ } else {
+ // We are in the right hand of correlate by corId:
replace correlate variable.
+ CorrelationId newCorId = cluster().createCorrel();
+ CorrelationId oldCorId = replaceMap.put(corId,
newCorId);
+
+ try {
+ correlate = correlate.copy(
+ correlate.getTraitSet(),
+ correlate.getLeft(),
+ correlate.getRight(),
+ newCorId,
+ correlate.getRequiredColumns(),
+ correlate.getJoinType()
+ );
+
+ return visitLeftAndRightCorrelateHands(correlate,
corId);
+ } finally {
+ if (oldCorId == null) {
+ replaceMap.remove(corId);
+ } else {
+ replaceMap.put(corId, oldCorId);
+ }
+ }
+ }
+ } else {
+ usedSet.add(corId);
+
+ return visitLeftAndRightCorrelateHands(correlate, corId);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelNode visit(RelNode other) {
+ RelNode next = super.visit(other);
+
+ return replaceMap.isEmpty() ? next : next.accept(rexShuttle);
+ }
+
+ private RelNode visitLeftAndRightCorrelateHands(LogicalCorrelate
correlate, CorrelationId corId) {
+ RelNode node = correlate;
+
+ node = visitChild(node, 0, correlate.getLeft());
+
+ removeMap.compute(corId, (k, v) -> v == null ? 1 : v + 1);
+
+ try {
+ node = visitChild(node, 1, correlate.getRight());
+ } finally {
+ removeMap.compute(corId, (k, v) -> v == 1 ? null : v - 1);
+ }
+
+ return node;
+ }
+ };
+
+ return relShuttle.visit(rel);
+ }
+
private SqlToRelConverter sqlToRelConverter(SqlValidator validator,
CalciteCatalogReader reader,
SqlToRelConverter.Config config) {
return new IgniteSqlToRelConvertor(this, validator, reader, cluster(),
convertletTbl, config);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
index c3b98fad0..857768261 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
@@ -84,6 +84,8 @@ public class PlannerHelper {
// Transformation chain
rel = planner.transform(PlannerPhase.HEP_DECORRELATE,
rel.getTraitSet(), rel);
+ rel = planner.replaceCorrelatesCollisions(rel);
+
rel = planner.trimUnusedFields(root.withRel(rel)).rel;
rel = planner.transform(PlannerPhase.HEP_FILTER_PUSH_DOWN,
rel.getTraitSet(), rel);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
index 664fdd59b..203edb15b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
@@ -18,11 +18,23 @@
package org.apache.ignite.internal.sql.engine.planner;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
+import org.apache.ignite.internal.sql.engine.prepare.PlannerPhase;
+import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -76,5 +88,154 @@ public class CorrelatedSubqueryPlannerTest extends
AbstractPlannerTest {
join.getLeft().getRowType()
);
}
+
+ /**
+ * Test verifies resolving of collisions in the left hand of correlates.
+ */
+ @Test
+ public void testCorrelatesCollisionsLeftHand() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("T1", IgniteDistributions.single(), "A",
Integer.class,
+ "B", Integer.class, "C", Integer.class, "D",
Integer.class)
+ );
+
+ String sql = "SELECT * FROM t1 as cor WHERE "
+ + "EXISTS (SELECT 1 FROM t1 WHERE t1.b = cor.a) AND "
+ + "EXISTS (SELECT 1 FROM t1 WHERE t1.c = cor.a) AND "
+ + "EXISTS (SELECT 1 FROM t1 WHERE t1.d = cor.a)";
+
+ PlanningContext ctx = plannerCtx(sql, schema);
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ RelNode rel = convertSubQueries(planner, ctx);
+
+ List<LogicalCorrelate> correlates = findNodes(rel,
byClass(LogicalCorrelate.class));
+
+ assertEquals(3, correlates.size());
+
+ // There are collisions by correlation id.
+ assertEquals(correlates.get(0).getCorrelationId(),
correlates.get(1).getCorrelationId());
+ assertEquals(correlates.get(0).getCorrelationId(),
correlates.get(2).getCorrelationId());
+
+ rel = planner.replaceCorrelatesCollisions(rel);
+
+ correlates = findNodes(rel, byClass(LogicalCorrelate.class));
+
+ assertEquals(3, correlates.size());
+
+ // There are no collisions by correlation id.
+
assertFalse(correlates.get(0).getCorrelationId().equals(correlates.get(1).getCorrelationId()));
+
assertFalse(correlates.get(0).getCorrelationId().equals(correlates.get(2).getCorrelationId()));
+
assertFalse(correlates.get(1).getCorrelationId().equals(correlates.get(2).getCorrelationId()));
+
+ List<LogicalFilter> filters = findNodes(rel,
byClass(LogicalFilter.class)
+ .and(f -> RexUtils.hasCorrelation(((Filter)
f).getCondition())));
+
+ assertEquals(3, filters.size());
+
+ // Filters match correlates in reverse order (we find outer
correlate first, but inner filter first).
+ assertEquals(Set.of(correlates.get(0).getCorrelationId()),
+
RexUtils.extractCorrelationIds(filters.get(2).getCondition()));
+
+ assertEquals(Set.of(correlates.get(1).getCorrelationId()),
+
RexUtils.extractCorrelationIds(filters.get(1).getCondition()));
+
+ assertEquals(Set.of(correlates.get(2).getCorrelationId()),
+
RexUtils.extractCorrelationIds(filters.get(0).getCondition()));
+ }
+ }
+
+ /**
+ * Test verifies resolving of collisions in the right hand of correlates.
+ */
+ @Test
+ public void testCorrelatesCollisionsRightHand() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("T1", IgniteDistributions.single(), "A",
Integer.class)
+ );
+
+ String sql = "SELECT (SELECT (SELECT (SELECT cor.a))) FROM t1 as cor";
+
+ PlanningContext ctx = plannerCtx(sql, schema);
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ RelNode rel = convertSubQueries(planner, ctx);
+
+ List<LogicalCorrelate> correlates = findNodes(rel,
byClass(LogicalCorrelate.class));
+
+ assertEquals(3, correlates.size());
+
+ // There are collisions by correlation id.
+ assertEquals(correlates.get(0).getCorrelationId(),
correlates.get(1).getCorrelationId());
+ assertEquals(correlates.get(0).getCorrelationId(),
correlates.get(2).getCorrelationId());
+
+ rel = planner.replaceCorrelatesCollisions(rel);
+
+ correlates = findNodes(rel, byClass(LogicalCorrelate.class));
+
+ assertEquals(1, correlates.size());
+ }
+ }
+
+ /**
+ * Test verifies resolving of collisions in right and left hands of
correlates.
+ */
+ @Test
+ public void testCorrelatesCollisionsMixed() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("T1", IgniteDistributions.single(), "A",
Integer.class,
+ "B", Integer.class, "C", Integer.class)
+ );
+
+ String sql = "SELECT * FROM t1 as cor WHERE "
+ + "EXISTS (SELECT 1 FROM t1 WHERE t1.b = (SELECT cor.a)) AND "
+ + "EXISTS (SELECT 1 FROM t1 WHERE t1.c = (SELECT cor.a))";
+
+ PlanningContext ctx = plannerCtx(sql, schema);
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ RelNode rel = convertSubQueries(planner, ctx);
+
+ List<LogicalCorrelate> correlates = findNodes(rel,
byClass(LogicalCorrelate.class));
+
+ assertEquals(4, correlates.size());
+
+ // There are collisions by correlation id.
+ assertEquals(correlates.get(0).getCorrelationId(),
correlates.get(1).getCorrelationId());
+ assertEquals(correlates.get(0).getCorrelationId(),
correlates.get(2).getCorrelationId());
+ assertEquals(correlates.get(0).getCorrelationId(),
correlates.get(3).getCorrelationId());
+
+ rel = planner.replaceCorrelatesCollisions(rel);
+
+ correlates = findNodes(rel, byClass(LogicalCorrelate.class));
+
+ assertEquals(2, correlates.size());
+
+ // There are no collisions by correlation id.
+
assertFalse(correlates.get(0).getCorrelationId().equals(correlates.get(1).getCorrelationId()));
+
+ List<LogicalProject> projects = findNodes(rel,
byClass(LogicalProject.class)
+ .and(f -> RexUtils.hasCorrelation(((Project)
f).getProjects())));
+
+ assertEquals(2, projects.size());
+
+ assertEquals(Set.of(correlates.get(0).getCorrelationId()),
+
RexUtils.extractCorrelationIds(projects.get(1).getProjects()));
+
+ assertEquals(Set.of(correlates.get(1).getCorrelationId()),
+
RexUtils.extractCorrelationIds(projects.get(0).getProjects()));
+ }
+ }
+
+ private RelNode convertSubQueries(IgnitePlanner planner, PlanningContext
ctx) throws Exception {
+ // Parse and validate.
+ SqlNode sqlNode = planner.validate(planner.parse(ctx.query()));
+
+ // Create original logical plan.
+ RelNode rel = planner.rel(sqlNode).rel;
+
+ // Convert sub-queries to correlates.
+ return planner.transform(PlannerPhase.HEP_DECORRELATE,
rel.getTraitSet(), rel);
+ }
}