This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 68533e0d984 IGNITE-22590 SQL Calcite: Fix rehashing of inputs 
containing null values - Fixes #11410.
68533e0d984 is described below

commit 68533e0d984681709fca11d8c422d18bc2b23e97
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Aug 9 10:28:47 2024 +0300

    IGNITE-22590 SQL Calcite: Fix rehashing of inputs containing null values - 
Fixes #11410.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/exec/LogicalRelImplementor.java  | 21 ++++++++
 .../integration/JoinRehashIntegrationTest.java     | 60 ++++++++++++++++------
 2 files changed, 64 insertions(+), 17 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 941ea64c494..3a07b684625 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.Spool;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
@@ -172,6 +173,26 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
         Node<Row> input = visit(rel.getInput());
 
+        if (distribution.function().affinity()) { // Affinity key can't be 
null, so filter out null values.
+            assert distribution.getKeys().size() == 1 : "Unexpected affinity 
keys count: " +
+                distribution.getKeys().size() + ", must be 1";
+
+            int affKey = distribution.getKeys().get(0);
+
+            RelDataTypeField affFld = 
rel.getRowType().getFieldList().get(affKey);
+
+            assert affFld != null : "Unexpected affinity key field: " + affKey;
+
+            if (affFld.getType().isNullable()) {
+                FilterNode<Row> filter = new FilterNode<>(ctx, 
rel.getRowType(),
+                    r -> ctx.rowHandler().get(affKey, r) != null);
+
+                filter.register(input);
+
+                input = filter;
+            }
+        }
+
         outbox.register(input);
 
         mailboxRegistry.register(outbox);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
index ad7fc45434f..8b6c46aba79 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
@@ -32,6 +32,49 @@ public class JoinRehashIntegrationTest extends 
AbstractBasicIntegrationTest {
     /** Test that resources (in particular inboxes) are cleaned up after 
executing join with rehashing. */
     @Test
     public void testResourceCleanup() throws Exception {
+        prepareTables();
+
+        String sql = "SELECT sum(i.price * i.amount)" +
+            " FROM order_items i JOIN orders o ON o.id=i.orderId" +
+            " WHERE o.region = ?";
+
+        assertQuery(sql)
+            .withParams("region0")
+            .matches(QueryChecker.containsSubPlan("IgniteMergeJoin"))
+            
.matches(QueryChecker.containsSubPlan("IgniteExchange(distribution=[affinity"))
+            .returns(BigDecimal.valueOf(270))
+            .check();
+
+        // Here we only start queries and wait for result, actual resource 
clean up is checked by
+        // AbstractBasicIntegrationTest.afterTest method.
+        GridTestUtils.runMultiThreaded(() -> {
+            for (int i = 0; i < 100; i++)
+                sql(sql, i % 10);
+        }, 10, "query_starter");
+    }
+
+    /** Tests that null values are filtered out on rehashing. */
+    @Test
+    public void testNullAffinityKeys() {
+        prepareTables();
+
+        // Add null values.
+        for (int i = 0; i < 10; i++)
+            sql("INSERT INTO order_items VALUES(?, null, null, null)", 
"null_key_" + i);
+
+        String sql = "SELECT sum(i.price * i.amount)" +
+            " FROM order_items i JOIN orders o ON o.id=i.orderId" +
+            " WHERE o.region = ?";
+
+        assertQuery(sql)
+            .withParams("region0")
+            
.matches(QueryChecker.containsSubPlan("IgniteExchange(distribution=[affinity"))
+            .returns(BigDecimal.valueOf(270))
+            .check();
+    }
+
+    /** Prepare tables orders and order_items with data. */
+    private void prepareTables() {
         sql("CREATE TABLE order_items (\n" +
             "    id varchar,\n" +
             "    orderId int,\n" +
@@ -54,22 +97,5 @@ public class JoinRehashIntegrationTest extends 
AbstractBasicIntegrationTest {
             for (int j = 0; j < 20; j++)
                 sql("INSERT INTO order_items VALUES(?, ?, ?, ?)", i + "_" + j, 
i, i / 10.0, j % 10);
         }
-
-        String sql = "SELECT sum(i.price * i.amount)" +
-            " FROM order_items i JOIN orders o ON o.id=i.orderId" +
-            " WHERE o.region = ?";
-
-        assertQuery(sql)
-            .withParams("region0")
-            .matches(QueryChecker.containsSubPlan("IgniteMergeJoin"))
-            .returns(BigDecimal.valueOf(270))
-            .check();
-
-        // Here we only start queries and wait for result, actual resource 
clean up is checked by
-        // AbstractBasicIntegrationTest.afterTest method.
-        GridTestUtils.runMultiThreaded(() -> {
-            for (int i = 0; i < 100; i++)
-                sql(sql, i % 10);
-        }, 10, "query_starter");
     }
 }

Reply via email to