This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 68533e0d984 IGNITE-22590 SQL Calcite: Fix rehashing of inputs
containing null values - Fixes #11410.
68533e0d984 is described below
commit 68533e0d984681709fca11d8c422d18bc2b23e97
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Aug 9 10:28:47 2024 +0300
IGNITE-22590 SQL Calcite: Fix rehashing of inputs containing null values -
Fixes #11410.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/exec/LogicalRelImplementor.java | 21 ++++++++
.../integration/JoinRehashIntegrationTest.java | 60 ++++++++++++++++------
2 files changed, 64 insertions(+), 17 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 941ea64c494..3a07b684625 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.Spool;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
@@ -172,6 +173,26 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
Node<Row> input = visit(rel.getInput());
+ if (distribution.function().affinity()) { // Affinity key can't be
null, so filter out null values.
+ assert distribution.getKeys().size() == 1 : "Unexpected affinity
keys count: " +
+ distribution.getKeys().size() + ", must be 1";
+
+ int affKey = distribution.getKeys().get(0);
+
+ RelDataTypeField affFld =
rel.getRowType().getFieldList().get(affKey);
+
+ assert affFld != null : "Unexpected affinity key field: " + affKey;
+
+ if (affFld.getType().isNullable()) {
+ FilterNode<Row> filter = new FilterNode<>(ctx,
rel.getRowType(),
+ r -> ctx.rowHandler().get(affKey, r) != null);
+
+ filter.register(input);
+
+ input = filter;
+ }
+ }
+
outbox.register(input);
mailboxRegistry.register(outbox);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
index ad7fc45434f..8b6c46aba79 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
@@ -32,6 +32,49 @@ public class JoinRehashIntegrationTest extends
AbstractBasicIntegrationTest {
/** Test that resources (in particular inboxes) are cleaned up after
executing join with rehashing. */
@Test
public void testResourceCleanup() throws Exception {
+ prepareTables();
+
+ String sql = "SELECT sum(i.price * i.amount)" +
+ " FROM order_items i JOIN orders o ON o.id=i.orderId" +
+ " WHERE o.region = ?";
+
+ assertQuery(sql)
+ .withParams("region0")
+ .matches(QueryChecker.containsSubPlan("IgniteMergeJoin"))
+
.matches(QueryChecker.containsSubPlan("IgniteExchange(distribution=[affinity"))
+ .returns(BigDecimal.valueOf(270))
+ .check();
+
+ // Here we only start queries and wait for result, actual resource
clean up is checked by
+ // AbstractBasicIntegrationTest.afterTest method.
+ GridTestUtils.runMultiThreaded(() -> {
+ for (int i = 0; i < 100; i++)
+ sql(sql, i % 10);
+ }, 10, "query_starter");
+ }
+
+ /** Tests that null values are filtered out on rehashing. */
+ @Test
+ public void testNullAffinityKeys() {
+ prepareTables();
+
+ // Add null values.
+ for (int i = 0; i < 10; i++)
+ sql("INSERT INTO order_items VALUES(?, null, null, null)",
"null_key_" + i);
+
+ String sql = "SELECT sum(i.price * i.amount)" +
+ " FROM order_items i JOIN orders o ON o.id=i.orderId" +
+ " WHERE o.region = ?";
+
+ assertQuery(sql)
+ .withParams("region0")
+
.matches(QueryChecker.containsSubPlan("IgniteExchange(distribution=[affinity"))
+ .returns(BigDecimal.valueOf(270))
+ .check();
+ }
+
+ /** Prepare tables orders and order_items with data. */
+ private void prepareTables() {
sql("CREATE TABLE order_items (\n" +
" id varchar,\n" +
" orderId int,\n" +
@@ -54,22 +97,5 @@ public class JoinRehashIntegrationTest extends
AbstractBasicIntegrationTest {
for (int j = 0; j < 20; j++)
sql("INSERT INTO order_items VALUES(?, ?, ?, ?)", i + "_" + j,
i, i / 10.0, j % 10);
}
-
- String sql = "SELECT sum(i.price * i.amount)" +
- " FROM order_items i JOIN orders o ON o.id=i.orderId" +
- " WHERE o.region = ?";
-
- assertQuery(sql)
- .withParams("region0")
- .matches(QueryChecker.containsSubPlan("IgniteMergeJoin"))
- .returns(BigDecimal.valueOf(270))
- .check();
-
- // Here we only start queries and wait for result, actual resource
clean up is checked by
- // AbstractBasicIntegrationTest.afterTest method.
- GridTestUtils.runMultiThreaded(() -> {
- for (int i = 0; i < 100; i++)
- sql(sql, i % 10);
- }, 10, "query_starter");
}
}