This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 15ebaffa3 IGNITE-16761 Sql. Join with correlated subqueries returns 
wrong result - Fixes #756.
15ebaffa3 is described below

commit 15ebaffa3c3e0cade2364beef739a848dfc749fd
Author: zstan <[email protected]>
AuthorDate: Wed Apr 6 17:20:54 2022 +0300

    IGNITE-16761 Sql. Join with correlated subqueries returns wrong result - 
Fixes #756.
    
    Signed-off-by: zstan <[email protected]>
---
 .../internal/sql/engine/ItCorrelatesTest.java      |  25 ++++
 .../internal/sql/engine/prepare/IgnitePlanner.java | 134 +++++++++++++++++
 .../internal/sql/engine/prepare/PlannerHelper.java |   2 +
 .../planner/CorrelatedSubqueryPlannerTest.java     | 161 +++++++++++++++++++++
 4 files changed, 322 insertions(+)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index 0da0c503a..0a7475d95 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -74,4 +74,29 @@ public class ItCorrelatesTest extends 
AbstractBasicIntegrationTest {
                 .returns(4)
                 .check();
     }
+
+    /**
+     * Tests resolving of collisions in correlates.
+     */
+    @Test
+    public void testCorrelatesCollision() {
+        sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
+        sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)");
+        sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
+        sql("INSERT INTO test2 VALUES (11, 1), (12, 1), (13, 4)");
+
+        // Collision by correlate variables in the left hand.
+        assertQuery("SELECT * FROM test1 WHERE "
+                + "EXISTS(SELECT * FROM test2 WHERE test1.a=test2.a AND 
test1.b<>test2.c) "
+                + "AND NOT EXISTS(SELECT * FROM test2 WHERE test1.a=test2.a 
AND test1.b<test2.c)")
+                .returns(12, 2)
+                .check();
+
+        // Collision by correlate variables in both, left and right hands.
+        assertQuery("SELECT * FROM test1 WHERE "
+                + "EXISTS(SELECT * FROM test2 WHERE (SELECT test1.a)=test2.a 
AND (SELECT test1.b)<>test2.c) "
+                + "AND NOT EXISTS(SELECT * FROM test2 WHERE (SELECT 
test1.a)=test2.a AND (SELECT test1.b)<test2.c)")
+                .returns(12, 2)
+                .check();
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
index ce0ffb00b..b1695d071 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgnitePlanner.java
@@ -21,7 +21,10 @@ import java.io.PrintWriter;
 import java.io.Reader;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
@@ -37,13 +40,21 @@ import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelHomogeneousShuttle;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexExecutor;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
@@ -342,6 +353,129 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         return root.withRel(converter.trimUnusedFields(dml || ordered, 
root.rel));
     }
 
+    /**
+     * When rewriting sub-queries to {@code LogicalCorrelate} instances, 
correlate nodes with the same correlation ids
+     * can be created (if there was more then one sub-query using the same 
correlate table). It's not a problem, when
+     * rows are processed one by one (like in the enumerable convension), but 
Ignite execution nodes process batches
+     * of rows, and execution nodes in some cases can get unexpected values 
for correlated variables.
+     *
+     * <p>This method replaces collisions by variables in correlates. For the 
left hand of LogicalCorrelate duplicated
+     * correlated variable and it's usages replaced with the new one. For 
example:
+     *
+     * <p>LogicalCorrelate(correlation=[$cor0])                       
LogicalCorrelate(correlation=[$cor0])
+     *   LogicalCorrelate(correlation=[$cor0])    transforms to      
LogicalCorrelate(correlation=[$cor1])
+     *     ... condition=[=($cor0.A, $0)] ...                          ... 
condition=[=($cor1.A, $0)] ...
+     *   ... condition=[=($cor0.A, $0)] ...                          ... 
condition=[=($cor0.A, $0)] ...
+     *
+     * <p>For the right hand of LogicalCorrelate duplicated LogicalCorrelate 
is just replaced with regular join.
+     * For example:
+     *
+     * <p>LogicalCorrelate(correlation=[$cor0])                       
LogicalCorrelate(correlation=[$cor0])
+     *   ...                                      transforms to      ...
+     *   LogicalCorrelate(correlation=[$cor0])                       
LogicalJoin(condition=true)
+     *
+     * @param rel Relational expression tree.
+     * @return Relational expression without collisions in correlates.
+     */
+    public RelNode replaceCorrelatesCollisions(RelNode rel) {
+        RelShuttle relShuttle = new RelHomogeneousShuttle() {
+            /** Set of used correlates. */
+            private final Set<CorrelationId> usedSet = new HashSet<>();
+
+            /** Map to find correlates, that should be replaced (in the left 
hand of correlate). */
+            private final Map<CorrelationId, CorrelationId> replaceMap = new 
HashMap<>();
+
+            /** Multiset to find correlates, that should be removed (in the 
right hand of correlate). */
+            private final Map<CorrelationId, Integer> removeMap = new 
HashMap<>();
+
+            private final RexShuttle rexShuttle = new RexShuttle() {
+                @Override public RexNode visitCorrelVariable(RexCorrelVariable 
variable) {
+                    CorrelationId newCorId = replaceMap.get(variable.id);
+
+                    if (newCorId != null) {
+                        return 
cluster().getRexBuilder().makeCorrel(variable.getType(), newCorId);
+                    } else {
+                        return variable;
+                    }
+                }
+            };
+
+            /** {@inheritDoc} */
+            @Override public RelNode visit(LogicalCorrelate correlate) {
+                CorrelationId corId = correlate.getCorrelationId();
+
+                if (usedSet.contains(corId)) {
+                    if (removeMap.containsKey(corId)) {
+                        // We are in the right hand of correlate by corId: 
replace correlate with join.
+                        RelNode join = LogicalJoin.create(
+                                correlate.getLeft(),
+                                correlate.getRight(),
+                                List.of(),
+                                cluster().getRexBuilder().makeLiteral(true),
+                                Set.of(),
+                                correlate.getJoinType()
+                        );
+
+                        return super.visit(join);
+                    } else {
+                        // We are in the right hand of correlate by corId: 
replace correlate variable.
+                        CorrelationId newCorId = cluster().createCorrel();
+                        CorrelationId oldCorId = replaceMap.put(corId, 
newCorId);
+
+                        try {
+                            correlate = correlate.copy(
+                                    correlate.getTraitSet(),
+                                    correlate.getLeft(),
+                                    correlate.getRight(),
+                                    newCorId,
+                                    correlate.getRequiredColumns(),
+                                    correlate.getJoinType()
+                            );
+
+                            return visitLeftAndRightCorrelateHands(correlate, 
corId);
+                        } finally {
+                            if (oldCorId == null) {
+                                replaceMap.remove(corId);
+                            } else {
+                                replaceMap.put(corId, oldCorId);
+                            }
+                        }
+                    }
+                } else {
+                    usedSet.add(corId);
+
+                    return visitLeftAndRightCorrelateHands(correlate, corId);
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public RelNode visit(RelNode other) {
+                RelNode next = super.visit(other);
+
+                return replaceMap.isEmpty() ? next : next.accept(rexShuttle);
+            }
+
+            private RelNode visitLeftAndRightCorrelateHands(LogicalCorrelate 
correlate, CorrelationId corId) {
+                RelNode node = correlate;
+
+                node = visitChild(node, 0, correlate.getLeft());
+
+                removeMap.compute(corId, (k, v) -> v == null ? 1 : v + 1);
+
+                try {
+                    node = visitChild(node, 1, correlate.getRight());
+                } finally {
+                    removeMap.compute(corId, (k, v) -> v == 1 ? null : v - 1);
+                }
+
+                return node;
+            }
+        };
+
+        return relShuttle.visit(rel);
+    }
+
     private SqlToRelConverter sqlToRelConverter(SqlValidator validator, 
CalciteCatalogReader reader,
             SqlToRelConverter.Config config) {
         return new IgniteSqlToRelConvertor(this, validator, reader, cluster(), 
convertletTbl, config);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
index c3b98fad0..857768261 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
@@ -84,6 +84,8 @@ public class PlannerHelper {
             // Transformation chain
             rel = planner.transform(PlannerPhase.HEP_DECORRELATE, 
rel.getTraitSet(), rel);
 
+            rel = planner.replaceCorrelatesCollisions(rel);
+
             rel = planner.trimUnusedFields(root.withRel(rel)).rel;
 
             rel = planner.transform(PlannerPhase.HEP_FILTER_PUSH_DOWN, 
rel.getTraitSet(), rel);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
index 664fdd59b..203edb15b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedSubqueryPlannerTest.java
@@ -18,11 +18,23 @@
 package org.apache.ignite.internal.sql.engine.planner;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
+import org.apache.ignite.internal.sql.engine.prepare.PlannerPhase;
+import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
 import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -76,5 +88,154 @@ public class CorrelatedSubqueryPlannerTest extends 
AbstractPlannerTest {
                 join.getLeft().getRowType()
         );
     }
+
+    /**
+     * Test verifies resolving of collisions in the left hand of correlates.
+     */
+    @Test
+    public void testCorrelatesCollisionsLeftHand() throws Exception {
+        IgniteSchema schema = createSchema(
+                createTable("T1", IgniteDistributions.single(), "A", 
Integer.class,
+                        "B", Integer.class, "C", Integer.class, "D", 
Integer.class)
+        );
+
+        String sql = "SELECT * FROM t1 as cor WHERE "
+                + "EXISTS (SELECT 1 FROM t1 WHERE t1.b = cor.a) AND "
+                + "EXISTS (SELECT 1 FROM t1 WHERE t1.c = cor.a) AND "
+                + "EXISTS (SELECT 1 FROM t1 WHERE t1.d = cor.a)";
+
+        PlanningContext ctx = plannerCtx(sql, schema);
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            RelNode rel = convertSubQueries(planner, ctx);
+
+            List<LogicalCorrelate> correlates = findNodes(rel, 
byClass(LogicalCorrelate.class));
+
+            assertEquals(3, correlates.size());
+
+            // There are collisions by correlation id.
+            assertEquals(correlates.get(0).getCorrelationId(), 
correlates.get(1).getCorrelationId());
+            assertEquals(correlates.get(0).getCorrelationId(), 
correlates.get(2).getCorrelationId());
+
+            rel = planner.replaceCorrelatesCollisions(rel);
+
+            correlates = findNodes(rel, byClass(LogicalCorrelate.class));
+
+            assertEquals(3, correlates.size());
+
+            // There are no collisions by correlation id.
+            
assertFalse(correlates.get(0).getCorrelationId().equals(correlates.get(1).getCorrelationId()));
+            
assertFalse(correlates.get(0).getCorrelationId().equals(correlates.get(2).getCorrelationId()));
+            
assertFalse(correlates.get(1).getCorrelationId().equals(correlates.get(2).getCorrelationId()));
+
+            List<LogicalFilter> filters = findNodes(rel, 
byClass(LogicalFilter.class)
+                    .and(f -> RexUtils.hasCorrelation(((Filter) 
f).getCondition())));
+
+            assertEquals(3, filters.size());
+
+            // Filters match correlates in reverse order (we find outer 
correlate first, but inner filter first).
+            assertEquals(Set.of(correlates.get(0).getCorrelationId()),
+                    
RexUtils.extractCorrelationIds(filters.get(2).getCondition()));
+
+            assertEquals(Set.of(correlates.get(1).getCorrelationId()),
+                    
RexUtils.extractCorrelationIds(filters.get(1).getCondition()));
+
+            assertEquals(Set.of(correlates.get(2).getCorrelationId()),
+                    
RexUtils.extractCorrelationIds(filters.get(0).getCondition()));
+        }
+    }
+
+    /**
+     * Test verifies resolving of collisions in the right hand of correlates.
+     */
+    @Test
+    public void testCorrelatesCollisionsRightHand() throws Exception {
+        IgniteSchema schema = createSchema(
+                createTable("T1", IgniteDistributions.single(), "A", 
Integer.class)
+        );
+
+        String sql = "SELECT (SELECT (SELECT (SELECT cor.a))) FROM t1 as cor";
+
+        PlanningContext ctx = plannerCtx(sql, schema);
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            RelNode rel = convertSubQueries(planner, ctx);
+
+            List<LogicalCorrelate> correlates = findNodes(rel, 
byClass(LogicalCorrelate.class));
+
+            assertEquals(3, correlates.size());
+
+            // There are collisions by correlation id.
+            assertEquals(correlates.get(0).getCorrelationId(), 
correlates.get(1).getCorrelationId());
+            assertEquals(correlates.get(0).getCorrelationId(), 
correlates.get(2).getCorrelationId());
+
+            rel = planner.replaceCorrelatesCollisions(rel);
+
+            correlates = findNodes(rel, byClass(LogicalCorrelate.class));
+
+            assertEquals(1, correlates.size());
+        }
+    }
+
+    /**
+     * Test verifies resolving of collisions in right and left hands of 
correlates.
+     */
+    @Test
+    public void testCorrelatesCollisionsMixed() throws Exception {
+        IgniteSchema schema = createSchema(
+                createTable("T1", IgniteDistributions.single(), "A", 
Integer.class,
+                        "B", Integer.class, "C", Integer.class)
+        );
+
+        String sql = "SELECT * FROM t1 as cor WHERE "
+                + "EXISTS (SELECT 1 FROM t1 WHERE t1.b = (SELECT cor.a)) AND "
+                + "EXISTS (SELECT 1 FROM t1 WHERE t1.c = (SELECT cor.a))";
+
+        PlanningContext ctx = plannerCtx(sql, schema);
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            RelNode rel = convertSubQueries(planner, ctx);
+
+            List<LogicalCorrelate> correlates = findNodes(rel, 
byClass(LogicalCorrelate.class));
+
+            assertEquals(4, correlates.size());
+
+            // There are collisions by correlation id.
+            assertEquals(correlates.get(0).getCorrelationId(), 
correlates.get(1).getCorrelationId());
+            assertEquals(correlates.get(0).getCorrelationId(), 
correlates.get(2).getCorrelationId());
+            assertEquals(correlates.get(0).getCorrelationId(), 
correlates.get(3).getCorrelationId());
+
+            rel = planner.replaceCorrelatesCollisions(rel);
+
+            correlates = findNodes(rel, byClass(LogicalCorrelate.class));
+
+            assertEquals(2, correlates.size());
+
+            // There are no collisions by correlation id.
+            
assertFalse(correlates.get(0).getCorrelationId().equals(correlates.get(1).getCorrelationId()));
+
+            List<LogicalProject> projects = findNodes(rel, 
byClass(LogicalProject.class)
+                    .and(f -> RexUtils.hasCorrelation(((Project) 
f).getProjects())));
+
+            assertEquals(2, projects.size());
+
+            assertEquals(Set.of(correlates.get(0).getCorrelationId()),
+                    
RexUtils.extractCorrelationIds(projects.get(1).getProjects()));
+
+            assertEquals(Set.of(correlates.get(1).getCorrelationId()),
+                    
RexUtils.extractCorrelationIds(projects.get(0).getProjects()));
+        }
+    }
+
+    private RelNode convertSubQueries(IgnitePlanner planner, PlanningContext 
ctx) throws Exception {
+        // Parse and validate.
+        SqlNode sqlNode = planner.validate(planner.parse(ctx.query()));
+
+        // Create original logical plan.
+        RelNode rel = planner.rel(sqlNode).rel;
+
+        // Convert sub-queries to correlates.
+        return planner.transform(PlannerPhase.HEP_DECORRELATE, 
rel.getTraitSet(), rel);
+    }
 }
 

Reply via email to