This is an automated email from the ASF dual-hosted git repository.
jooger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5a0369a75f IGNITE-21311: Introduce pruning for SQL correlated scans
(#3318)
5a0369a75f is described below
commit 5a0369a75ff59d03bc07835832dbbaac5aa0e88d
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed Mar 13 15:39:42 2024 +0200
IGNITE-21311: Introduce pruning for SQL correlated scans (#3318)
---
.../benchmark/SqlPartitionPruningBenchmark.java | 62 +++++-
.../internal/sql/engine/ItCorrelatesTest.java | 56 ++++-
.../sql/engine/exec/DynamicPartitionProvider.java | 59 ++++++
.../internal/sql/engine/exec/ExecutionContext.java | 16 ++
.../sql/engine/exec/ExecutionServiceImpl.java | 5 +-
.../sql/engine/exec/LogicalRelImplementor.java | 15 +-
.../sql/engine/exec/PartitionProvider.java | 61 ++++++
.../sql/engine/exec/StaticPartitionProvider.java | 45 ++++
.../sql/engine/exec/mapping/ColocationGroup.java | 14 +-
.../engine/exec/mapping/FragmentDescription.java | 13 +-
.../sql/engine/exec/mapping/MappedFragment.java | 23 +-
.../engine/exec/mapping/MappingServiceImpl.java | 3 +-
.../sql/engine/exec/rel/IndexScanNode.java | 20 +-
.../sql/engine/exec/rel/TableScanNode.java | 21 +-
.../sql/engine/externalize/RelJsonReader.java | 25 ++-
.../sql/engine/externalize/RelJsonWriter.java | 17 ++
.../prepare/pruning/PartitionPrunerImpl.java | 17 +-
.../prepare/pruning/PartitionPruningColumns.java | 91 +++++++-
.../prepare/pruning/PartitionPruningMetadata.java | 5 +-
.../pruning/PartitionPruningMetadataExtractor.java | 17 +-
.../prepare/pruning/PartitionPruningPredicate.java | 78 +++++--
.../sql/engine/exec/PartitionProvidersTest.java | 234 +++++++++++++++++++++
.../engine/exec/exp/ExpressionFactoryImplTest.java | 2 +-
.../engine/exec/mapping/FragmentMappingTest.java | 6 +-
.../sql/engine/exec/mapping/FragmentPrinter.java | 22 ++
.../sql/engine/exec/rel/AbstractExecutionTest.java | 2 +-
.../sql/engine/exec/rel/ExchangeExecutionTest.java | 6 +-
.../exec/rel/IndexScanNodeExecutionTest.java | 4 +-
.../engine/exec/rel/ModifyNodeExecutionTest.java | 2 +-
.../exec/rel/TableScanNodeExecutionTest.java | 4 +-
.../sql/engine/framework/TestBuilders.java | 16 +-
.../sql/engine/planner/PartitionPruningTest.java | 26 +++
.../pruning/PartitionPruningPredicateSelfTest.java | 45 +++-
.../pruning/PruningMetadataSerializationTest.java | 94 +++++++++
.../src/test/resources/mapping/correlated.test | 135 ++++++++++++
.../resources/mapping/test_partition_pruning.test | 72 +++++++
36 files changed, 1227 insertions(+), 106 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
index bd56cdf6c0..baf68f2b9a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.benchmark;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -85,10 +87,28 @@ public class SqlPartitionPruningBenchmark extends
AbstractMultiNodeBenchmark {
List.of("key1")
);
- KeyValueView<Tuple, Tuple> keyValueView =
clusterNode.tables().table("usertable2").keyValueView();
+ createTable("usertable3",
+ List.of(
+ "key1 int",
+ "key2 int",
+ "field1 varchar(100)"
+ ),
+ List.of("key1", "key2"),
+ List.of("key1")
+ );
+
+ initTable("usertable2", 10);
+ initTable("usertable3", 1);
+
+ session = clusterNode.sql().createSession();
+ }
+
+ private static void initTable(String tableName, int fieldCount) {
+ KeyValueView<Tuple, Tuple> keyValueView =
clusterNode.tables().table(tableName).keyValueView();
try (Session session = clusterNode.sql().createSession()) {
- try (var rs = session.execute(null, "CREATE INDEX
usertable2_sorted_idx ON usertable2 USING TREE (key1, key2)")) {
+ String query = format("CREATE INDEX {}_sorted_idx ON {} USING TREE
(key1, key2)", tableName, tableName);
+ try (var rs = session.execute(null, query)) {
while (rs.hasNext()) {
rs.next();
}
@@ -99,7 +119,7 @@ public class SqlPartitionPruningBenchmark extends
AbstractMultiNodeBenchmark {
for (int i = 0; i < TABLE_SIZE; i++) {
Tuple t = Tuple.create();
- for (int j = 1; j <= 10; j++) {
+ for (int j = 1; j <= fieldCount; j++) {
t.set("field" + j, FIELD_VAL);
}
@@ -108,8 +128,6 @@ public class SqlPartitionPruningBenchmark extends
AbstractMultiNodeBenchmark {
keyValueView.put(null, key, t);
}
-
- session = clusterNode.sql().createSession();
}
/** Select by key - should use key value plan. */
@@ -136,7 +154,7 @@ public class SqlPartitionPruningBenchmark extends
AbstractMultiNodeBenchmark {
/** Select by a single colocation key - should use a scan w/o partition
pruning because such predicate is too complex. */
@Benchmark
- public void selectWithNoPrunining(Blackhole bh) {
+ public void selectWithNoPruning(Blackhole bh) {
ThreadLocalRandom random = ThreadLocalRandom.current();
int key = random.nextInt(TABLE_SIZE);
@@ -145,12 +163,40 @@ public class SqlPartitionPruningBenchmark extends
AbstractMultiNodeBenchmark {
}
}
+ /** Correlated subquery with partition pruning .*/
+ @Benchmark
+ public void selectCorrelated(Blackhole bh) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int key = random.nextInt(TABLE_SIZE);
+
+ String query = "SELECT * FROM usertable2 as cor WHERE EXISTS "
+ + "(SELECT 1 FROM usertable3 WHERE usertable3.key1 = cor.key1)
AND key1=?";
+
+ try (var rs = session.execute(null, query, key)) {
+ expectSingleRecord(rs, bh);
+ }
+ }
+
+ /** Correlated subquery without partition pruning .*/
+ @Benchmark
+ public void selectCorrelatedNoPruning(Blackhole bh) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int key = random.nextInt(TABLE_SIZE);
+
+ String query = "SELECT * FROM usertable2 as cor WHERE EXISTS "
+ + "(SELECT 1 FROM usertable3 WHERE usertable3.key1 >= cor.key1
AND usertable3.key1 < cor.key1 + 1) AND key1=?";
+
+ try (var rs = session.execute(null, query, key)) {
+ expectSingleRecord(rs, bh);
+ }
+ }
+
/**
* Benchmark's entry point.
*/
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
- .include(".*" +
SqlPartitionPruningBenchmark.class.getSimpleName() + ".*")
+ .include(".*" +
SqlPartitionPruningBenchmark.class.getSimpleName() + ".selectCorrelated*")
.build();
new Runner(opt).run();
@@ -168,7 +214,7 @@ public class SqlPartitionPruningBenchmark extends
AbstractMultiNodeBenchmark {
i += 1;
}
if (i != 1) {
- throw new IllegalArgumentException("There should be exactly 1
output row");
+ throw new IllegalArgumentException("There should be exactly 1
output row but got " + i);
}
}
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index bfb84bc68b..99a804b890 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -37,9 +37,6 @@ public class ItCorrelatesTest extends BaseSqlIntegrationTest {
public void testCorrelatesAssignedBeforeAccess() {
sql("create table test_tbl(k INTEGER primary key, v INTEGER)");
- // TODO: IGNITE-16323 When the issue is not fixed the invocation
required for update metadata.
- CLUSTER.aliveNode().tables().tables();
-
sql("INSERT INTO test_tbl VALUES (1, 1)");
assertQuery("SELECT " + DISABLED_JOIN_RULES + " t0.v, (SELECT t0.v +
t1.v FROM test_tbl t1) AS j FROM test_tbl t0")
@@ -85,4 +82,57 @@ public class ItCorrelatesTest extends BaseSqlIntegrationTest
{
.returns(12, 2)
.check();
}
+
+ @Test
+ public void testCorrelations() {
+ sql("CREATE TABLE t1 (id INTEGER PRIMARY KEY, val INTEGER)");
+ sql("CREATE TABLE t2 (id INTEGER PRIMARY KEY, val INTEGER)");
+ sql("CREATE TABLE t3 (id INTEGER PRIMARY KEY, val INTEGER)");
+
+ sql("INSERT INTO t1 VALUES(1, 2)");
+ sql("INSERT INTO t1 VALUES(13, 14)");
+ sql("INSERT INTO t1 VALUES(42, 43)");
+
+ sql("INSERT INTO t2 VALUES(1, 2)");
+ sql("INSERT INTO t2 VALUES(42, 43)");
+
+ sql("INSERT INTO t3 VALUES(1, 11)");
+ sql("INSERT INTO t3 VALUES(13, 23)");
+ sql("INSERT INTO t3 VALUES(42, 52)");
+
+ // t1 -> t2 (t2 references t1)
+
+ assertQuery("SELECT * FROM t1 as cor WHERE EXISTS (SELECT 1 FROM t2
WHERE t2.id = cor.id)")
+ .returns(1, 2)
+ .returns(42, 43)
+ .check();
+
+ assertQuery("SELECT * FROM t1 as cor WHERE NOT EXISTS (SELECT 1 FROM
t2 WHERE t2.id = cor.id)")
+ .returns(13, 14)
+ .check();
+
+ // t3 -> t1 -> t2 (t2 references t1)
+
+ assertQuery("SELECT * FROM t3 AS out\n"
+ + "WHERE EXISTS (SELECT * FROM t1 as cor WHERE out.id = cor.id
AND EXISTS "
+ + "(SELECT 1 FROM t2 WHERE t2.id = cor.id))")
+ .returns(1, 11)
+ .returns(42, 52)
+ .check();
+
+ assertQuery("SELECT * FROM t3 AS out\n"
+ + "WHERE NOT EXISTS (SELECT * FROM t1 as cor WHERE out.id =
cor.id AND EXISTS "
+ + "(SELECT 1 FROM t2 WHERE t2.id = cor.id))")
+ .returns(13, 23)
+ .check();
+
+ // t3 -> t1 -> t2 (t2 references both t3 and t1)
+
+ assertQuery("SELECT * FROM t3 AS out\n"
+ + "WHERE EXISTS (SELECT * FROM t1 as cor WHERE out.id = cor.id
AND EXISTS "
+ + "(SELECT 1 FROM t2 WHERE t2.id = out.id OR t2.id = cor.id))")
+ .returns(1, 11)
+ .returns(42, 52)
+ .check();
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DynamicPartitionProvider.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DynamicPartitionProvider.java
new file mode 100644
index 0000000000..8910043a01
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DynamicPartitionProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.List;
+import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningColumns;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningPredicate;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+
+/**
+ * Partition provider that returns partitions based on the values of provided
by execution context at runtime.
+ */
+public class DynamicPartitionProvider<RowT> implements PartitionProvider<RowT>
{
+
+ private final String nodeName;
+
+ private final List<NodeWithConsistencyToken> assignments;
+
+ private final PartitionPruningColumns columns;
+
+ private final IgniteTable table;
+
+ /** Constructor. */
+ public DynamicPartitionProvider(
+ String nodeName,
+ List<NodeWithConsistencyToken> assignments,
+ PartitionPruningColumns columns,
+ IgniteTable table
+ ) {
+ this.nodeName = nodeName;
+ this.assignments = assignments;
+ this.columns = columns;
+ this.table = table;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<PartitionWithConsistencyToken>
getPartitions(ExecutionContext<RowT> ctx) {
+ ExpressionFactory<RowT> expressionFactory = ctx.expressionFactory();
+
+ return PartitionPruningPredicate.prunePartitions(columns, table,
expressionFactory, assignments, nodeName);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index bf4a9e1b44..75ca44ef80 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -43,6 +43,9 @@ import
org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningColumns;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.network.ClusterNode;
@@ -372,6 +375,19 @@ public class ExecutionContext<RowT> implements DataContext
{
return cancelFlag.get();
}
+ /** Creates {@link PartitionProvider} for the given source table. */
+ public PartitionProvider<RowT> getPartitionProvider(long sourceId,
ColocationGroup group, IgniteTable table) {
+ PartitionPruningMetadata metadata =
description.partitionPruningMetadata();
+ PartitionPruningColumns columns = metadata != null ?
metadata.get(sourceId) : null;
+ String nodeName = localNode.name();
+
+ if (columns == null) {
+ return new StaticPartitionProvider<>(nodeName, group, sourceId);
+ } else {
+ return new DynamicPartitionProvider<>(nodeName,
group.assignments(), columns, table);
+ }
+ }
+
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 6ba934e07f..89a1c07932 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -133,7 +133,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private static final List<InternalSqlRow> NOT_APPLIED_ANSWER = List.of(new
InternalSqlRowSingleBoolean(false));
private static final FragmentDescription DUMMY_DESCRIPTION = new
FragmentDescription(
- 0, true, Long2ObjectMaps.emptyMap(), null, null
+ 0, true, Long2ObjectMaps.emptyMap(), null, null, null
);
private final MessageService messageService;
@@ -922,7 +922,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
!fragment.correlated(),
mappedFragment.groupsBySourceId(),
mappedFragment.target(),
- mappedFragment.sourcesByExchangeId()
+ mappedFragment.sourcesByExchangeId(),
+ mappedFragment.partitionPruningMetadata()
);
for (String nodeName : mappedFragment.nodes()) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 2240e5f80e..6b44790bd7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.combinedRowType;
import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
@@ -392,9 +391,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
RowSchema rowSchema =
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
-
- List<PartitionWithConsistencyToken> partitions =
group.partitionsWithConsistencyTokens(ctx.localNode().name());
- assert !partitions.isEmpty() : format("No partitions for node {}
group: {}", ctx.localNode().name(), group);
+ PartitionProvider<RowT> partitionProvider =
ctx.getPartitionProvider(rel.sourceId(), group, tbl);
return new IndexScanNode<>(
ctx,
@@ -402,7 +399,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
idx,
scannableTable,
tbl.descriptor(),
- partitions,
+ partitionProvider,
comp,
ranges,
filters,
@@ -428,7 +425,8 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
Predicate<RowT> filters = condition == null ? null :
expressionFactory.predicate(condition, rowType);
Function<RowT, RowT> prj = projects == null ? null :
expressionFactory.project(projects, rowType);
- ColocationGroup group = ctx.group(rel.sourceId());
+ long sourceId = rel.sourceId();
+ ColocationGroup group = ctx.group(sourceId);
assert group != null;
@@ -439,14 +437,13 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
RowSchema rowSchema =
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
- List<PartitionWithConsistencyToken> partitions =
group.partitionsWithConsistencyTokens(ctx.localNode().name());
- assert !partitions.isEmpty() : format("No partitions for node {}
group: {}", ctx.localNode().name(), group);
+ PartitionProvider<RowT> partitionProvider =
ctx.getPartitionProvider(rel.sourceId(), group, tbl);
return new TableScanNode<>(
ctx,
rowFactory,
scannableTable,
- partitions,
+ partitionProvider,
filters,
prj,
requiredColumns == null ? null : requiredColumns.toBitSet()
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvider.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvider.java
new file mode 100644
index 0000000000..35e1789e0d
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Returns a list of partitions for a particular scan operation.
+ */
+@FunctionalInterface
+public interface PartitionProvider<RowT> {
+ /**
+ * Returns a list of partitions for a scan operation.
+ *
+ * @param ctx Execution context.
+ * @return a list of partitions.
+ */
+ List<PartitionWithConsistencyToken> getPartitions(ExecutionContext<RowT>
ctx);
+
+ /** Returns a partition provider that always returns the given list of
partitions. */
+ static <RowT> PartitionProvider<RowT>
fromPartitions(List<PartitionWithConsistencyToken> partitions) {
+ return ctx -> partitions;
+ }
+
+ /**
+ * Returns a list of partitions that belong to to the given node.
+ *
+ * @param assignments Assignments.
+ * @param nodeName node name.
+ *
+ * @return List of partitions.
+ */
+ static List<PartitionWithConsistencyToken>
partitionsForNode(List<NodeWithConsistencyToken> assignments, String nodeName) {
+ List<PartitionWithConsistencyToken> result = new ArrayList<>();
+
+ for (int i = 0; i < assignments.size(); i++) {
+ NodeWithConsistencyToken a = assignments.get(i);
+ if (a.name().equals(nodeName)) {
+ result.add(new PartitionWithConsistencyToken(i,
a.enlistmentConsistencyToken()));
+ }
+ }
+
+ return result;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/StaticPartitionProvider.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/StaticPartitionProvider.java
new file mode 100644
index 0000000000..82b3f807fc
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/StaticPartitionProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.util.List;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+
+/**
+ * Returns a list of partitions computed at mapping stage. This provider never
returns an empty list.
+ */
+public class StaticPartitionProvider<RowT> implements PartitionProvider<RowT> {
+
+ private final List<PartitionWithConsistencyToken> partitions;
+
+ /** Constructor. */
+ public StaticPartitionProvider(String nodeName, ColocationGroup
colocationGroup, long sourceId) {
+ List<PartitionWithConsistencyToken> partitions =
colocationGroup.partitionsWithConsistencyTokens(nodeName);
+ assert !partitions.isEmpty() : format("No partitions for node {}
group: {}", nodeName, colocationGroup);
+
+ this.partitions = partitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<PartitionWithConsistencyToken>
getPartitions(ExecutionContext<RowT> ctx) {
+ return partitions;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
index 9e60674a24..b3d440518c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
@@ -18,12 +18,12 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
/**
@@ -99,17 +99,7 @@ public class ColocationGroup implements Serializable {
if (partitionsPerNode != null) {
return partitionsPerNode.getOrDefault(nodeName,
Collections.emptyList());
} else {
- List<PartitionWithConsistencyToken> partitions = new ArrayList<>();
-
- for (int p = 0; p < assignments.size(); p++) {
- NodeWithConsistencyToken nodeWithConsistencyToken =
assignments.get(p);
-
- if (Objects.equals(nodeName, nodeWithConsistencyToken.name()))
{
- partitions.add(new PartitionWithConsistencyToken(p,
nodeWithConsistencyToken.enlistmentConsistencyToken()));
- }
- }
-
- return partitions;
+ return PartitionProvider.partitionsForNode(assignments, nodeName);
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
index 10de4a37d5..f9bfbbb883 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import java.io.Serializable;
import java.util.List;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
import org.jetbrains.annotations.Nullable;
/**
@@ -35,6 +36,7 @@ public class FragmentDescription implements Serializable {
private final Long2ObjectMap<ColocationGroup> groupsBySourceId;
private final @Nullable ColocationGroup target;
private final @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId;
+ private final @Nullable PartitionPruningMetadata pruningMetadata;
/**
* Constructor.
@@ -50,13 +52,15 @@ public class FragmentDescription implements Serializable {
boolean prefetch,
Long2ObjectMap<ColocationGroup> groupsBySourceId,
@Nullable ColocationGroup target,
- @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId
+ @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId,
+ @Nullable PartitionPruningMetadata pruningMetadata
) {
this.fragmentId = fragmentId;
this.prefetch = prefetch;
this.groupsBySourceId = groupsBySourceId;
this.target = target;
this.sourcesByExchangeId = sourcesByExchangeId;
+ this.pruningMetadata = pruningMetadata;
}
/** Returns {@code true} if it's safe to execute this fragment in advance.
*/
@@ -95,4 +99,11 @@ public class FragmentDescription implements Serializable {
public @Nullable ColocationGroup group(long sourceId) {
return groupsBySourceId.get(sourceId);
}
+
+ /**
+ * Returns partition pruning metadata.
+ */
+ public @Nullable PartitionPruningMetadata partitionPruningMetadata() {
+ return pruningMetadata;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
index b5ad0f9fe4..ede13d3d86 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
import org.jetbrains.annotations.Nullable;
/**
@@ -43,13 +44,15 @@ public class MappedFragment {
private final Long2ObjectMap<ColocationGroup> groupsBySourceId;
private final @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId;
private final @Nullable ColocationGroup target;
+ private final PartitionPruningMetadata partitionPruningMetadata;
/** Constructor. */
MappedFragment(
Fragment fragment,
List<ColocationGroup> groups,
@Nullable Long2ObjectMap<List<String>> sourcesByExchangeId,
- @Nullable ColocationGroup target
+ @Nullable ColocationGroup target,
+ @Nullable PartitionPruningMetadata pruningMetadata
) {
this.fragment = fragment;
this.groups = groups;
@@ -69,6 +72,7 @@ public class MappedFragment {
this.groupsBySourceId = groupsBySourceId;
this.sourcesByExchangeId = sourcesByExchangeId;
this.target = target;
+ this.partitionPruningMetadata = pruningMetadata;
}
/** Constructor. */
@@ -77,7 +81,8 @@ public class MappedFragment {
List<ColocationGroup> groups, List<String> nodes,
Long2ObjectMap<ColocationGroup> groupsBySourceId,
@Nullable Long2ObjectMap<List<String>> sourcesByExchangeId,
- @Nullable ColocationGroup target
+ @Nullable ColocationGroup target,
+ @Nullable PartitionPruningMetadata pruningMetadata
) {
this.fragment = fragment;
this.nodes = List.copyOf(nodes);
@@ -85,6 +90,7 @@ public class MappedFragment {
this.sourcesByExchangeId = sourcesByExchangeId;
this.target = target;
this.groups = groups;
+ this.partitionPruningMetadata = pruningMetadata;
}
public Fragment fragment() {
@@ -111,6 +117,10 @@ public class MappedFragment {
return sourcesByExchangeId;
}
+ public @Nullable PartitionPruningMetadata partitionPruningMetadata() {
+ return partitionPruningMetadata;
+ }
+
/**
* Creates a fragment by replacing the given colocation groups.
*
@@ -130,7 +140,7 @@ public class MappedFragment {
}
}
- return new MappedFragment(fragment, newGroups, sourcesByExchangeId,
target);
+ return new MappedFragment(fragment, newGroups, sourcesByExchangeId,
target, partitionPruningMetadata);
}
/**
@@ -149,6 +159,11 @@ public class MappedFragment {
newSourcesByExchangeId.put(exchangeId, newNodes);
// The nodes should remain the same in order to preserve connectivity
between fragments.
- return new MappedFragment(fragment, groups, nodes, groupsBySourceId,
newSourcesByExchangeId, target);
+ return new MappedFragment(fragment, groups, nodes, groupsBySourceId,
newSourcesByExchangeId, target, partitionPruningMetadata);
+ }
+
+ /** Adds partition pruning metadata to this fragment. */
+ public MappedFragment
withPartitionPruningMetadata(PartitionPruningMetadata pruningMetadata) {
+ return new MappedFragment(fragment, groups, sourcesByExchangeId,
target, pruningMetadata);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index a3402efc42..13bf05cb35 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -269,7 +269,8 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
fragment,
mapping.groups(),
sourcesByExchangeId,
- targetGroup
+ targetGroup,
+ null
)
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index e85ac8da02..987b8fdf9d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -28,6 +28,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
@@ -52,8 +53,8 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
private final RowHandler.RowFactory<RowT> factory;
- /** List of pairs containing the partition number to scan with the
corresponding enlistment consistency token. */
- private final Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens;
+ /** Returns partitions to be used by this scan. */
+ private final PartitionProvider<RowT> partitionProvider;
/** Participating columns. */
private final @Nullable BitSet requiredColumns;
@@ -70,8 +71,7 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
* @param ctx Execution context.
* @param rowFactory Row factory.
* @param tableDescriptor Table descriptor.
- * @param partsWithConsistencyTokens List of pairs containing the
partition number to scan with the corresponding enlistment
- * consistency token.
+ * @param partitionProvider Partition provider.
* @param comp Rows comparator.
* @param rangeConditions Range conditions.
* @param filters Optional filter to filter out rows.
@@ -84,7 +84,7 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
IgniteIndex schemaIndex,
ScannableTable table,
TableDescriptor tableDescriptor,
- Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens,
+ PartitionProvider<RowT> partitionProvider,
@Nullable Comparator<RowT> comp,
@Nullable RangeIterable<RowT> rangeConditions,
@Nullable Predicate<RowT> filters,
@@ -93,11 +93,9 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
) {
super(ctx, filters, rowTransformer);
- assert partsWithConsistencyTokens != null &&
!partsWithConsistencyTokens.isEmpty();
-
this.schemaIndex = schemaIndex;
this.table = table;
- this.partsWithConsistencyTokens = partsWithConsistencyTokens;
+ this.partitionProvider = partitionProvider;
this.requiredColumns = requiredColumns;
this.rangeConditions = rangeConditions;
this.comp = comp;
@@ -113,11 +111,13 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
/** {@inheritDoc} */
@Override
protected Publisher<RowT> scan() {
+ List<PartitionWithConsistencyToken> partitions =
partitionProvider.getPartitions(context());
+
if (rangeConditions != null) {
return SubscriptionUtils.concat(
- new TransformingIterator<>(rangeConditions.iterator(),
cond -> indexPublisher(partsWithConsistencyTokens, cond)));
+ new TransformingIterator<>(rangeConditions.iterator(),
cond -> indexPublisher(partitions, cond)));
} else {
- return indexPublisher(partsWithConsistencyTokens, null);
+ return indexPublisher(partitions, null);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index bcfcb1e76f..5425978b95 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -18,12 +18,13 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
import java.util.BitSet;
-import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -40,8 +41,8 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
/** Table that provides access to underlying data. */
private final ScannableTable table;
- /** List of pairs containing the partition number to scan with the
corresponding enlistment consistency token. */
- private final Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens;
+ /** Returns partitions to be used by this scan. */
+ private final PartitionProvider<RowT> partitionProvider;
private final RowFactory<RowT> rowFactory;
@@ -53,7 +54,7 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
* @param ctx Execution context.
* @param rowFactory Row factory.
* @param table Internal table.
- * @param partsWithConsistencyTokens List of pairs containing the
partition number to scan with the corresponding enlistment
+ * @param partitionProvider List of pairs containing the partition number
to scan with the corresponding enlistment
* consistency token.
* @param filters Optional filter to filter out rows.
* @param rowTransformer Optional projection function.
@@ -63,17 +64,15 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
ExecutionContext<RowT> ctx,
RowHandler.RowFactory<RowT> rowFactory,
ScannableTable table,
- Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens,
+ PartitionProvider<RowT> partitionProvider,
@Nullable Predicate<RowT> filters,
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable BitSet requiredColumns
) {
super(ctx, filters, rowTransformer);
- assert partsWithConsistencyTokens != null &&
!partsWithConsistencyTokens.isEmpty();
-
this.table = table;
- this.partsWithConsistencyTokens = partsWithConsistencyTokens;
+ this.partitionProvider = partitionProvider;
this.rowFactory = rowFactory;
this.requiredColumns = requiredColumns;
}
@@ -81,10 +80,10 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
/** {@inheritDoc} */
@Override
protected Publisher<RowT> scan() {
+ List<PartitionWithConsistencyToken> partitions =
partitionProvider.getPartitions(context());
+
Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
- partsWithConsistencyTokens.iterator(),
partWithConsistencyToken -> {
- return table.scan(context(), partWithConsistencyToken, rowFactory,
requiredColumns);
- });
+ partitions.iterator(), p -> table.scan(context(), p,
rowFactory, requiredColumns));
return SubscriptionUtils.concat(it);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index f251f4c8b2..f5842cb6b8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -62,7 +62,9 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class RelJsonReader {
- private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF
= new TypeReference<>() {};
+ static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF = new
TypeReference<>() {};
+
+ private static final Map<String, Object> EMPTY_JSON_RELS = Map.of("rels",
List.of());
private final ObjectMapper mapper = new
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
@@ -84,6 +86,21 @@ public class RelJsonReader {
return (T) reader.read(json);
}
+ /** Creates a {@link RexNode} from the given json. */
+ public static RexNode fromExprJson(String json) {
+ try {
+ RelJsonReader reader = new RelJsonReader(null);
+
+ RelInput relInput = reader.newInput(EMPTY_JSON_RELS);
+
+ LinkedHashMap<String, Object> val = reader.mapper.readValue(json,
TYPE_REF);
+
+ return reader.relJson.toRex(relInput, val);
+ } catch (IOException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, "RelJson
expression serialization error", e);
+ }
+ }
+
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -120,11 +137,15 @@ public class RelJsonReader {
String id = (String) jsonRel.get("id");
String type = (String) jsonRel.get("relOp");
Function<RelInput, RelNode> factory = relJson.factory(type);
- RelNode rel = factory.apply(new RelInputImpl(jsonRel));
+ RelNode rel = factory.apply(newInput(jsonRel));
relMap.put(id, rel);
lastRel = rel;
}
+ private RelInput newInput(Map<String, Object> jsonRel) {
+ return new RelInputImpl(jsonRel);
+ }
+
private class RelInputImpl implements RelInputEx {
private final Map<String, Object> jsonRel;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
index 73c8c6e485..95cb8de3d9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.externalize;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
@@ -30,6 +31,7 @@ import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -66,6 +68,21 @@ public class RelJsonWriter implements RelWriter {
return writer.asString();
}
+ /** Converts the given {@link RexNode} to json. */
+ public static String toExprJson(RexNode node) {
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ RelJson relJson = new RelJson();
+
+ Object map = relJson.toJson(node);
+
+ return mapper.writeValueAsString(map);
+ } catch (JsonProcessingException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, "RelJson
expression serialization error", e);
+ }
+ }
+
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
index 3b3385e503..5b74118f54 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
@@ -71,6 +71,17 @@ public class PartitionPrunerImpl implements PartitionPruner {
continue;
}
+ // Do not update colocation groups, in case when predicates
include correlated variables,
+ // because partitions for such case can be removed only at runtime.
+ boolean containCorrelatedVariables =
pruningMetadata.data().values()
+ .stream()
+
.anyMatch(PartitionPruningColumns::containCorrelatedVariables);
+
+ if (containCorrelatedVariables) {
+
updatedFragments.add(mappedFragment.withPartitionPruningMetadata(pruningMetadata));
+ continue;
+ }
+
// Update fragment by applying PP metadata.
MappedFragment newFragment =
updateColocationGroups(mappedFragment, pruningMetadata, dynamicParameters);
@@ -161,8 +172,10 @@ public class PartitionPrunerImpl implements
PartitionPruner {
ColocationGroup colocationGroup =
mappedFragment.groupsBySourceId().get(sourceId);
assert colocationGroup != null : "No colocation group#" + sourceId;
- PartitionPruningPredicate pruningPredicate = new
PartitionPruningPredicate(table, pruningColumns, dynamicParameters);
- ColocationGroup newColocationGroup =
pruningPredicate.prunePartitions(colocationGroup);
+ ColocationGroup newColocationGroup =
PartitionPruningPredicate.prunePartitions(
+ table, pruningColumns, dynamicParameters,
+ colocationGroup
+ );
newColocationGroups.put(sourceId, newColocationGroup);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
index b5e10b2432..8f795316ca 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
@@ -18,13 +18,24 @@
package org.apache.ignite.internal.sql.engine.prepare.pruning;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
+import org.apache.ignite.internal.sql.engine.externalize.RelJsonWriter;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.TestOnly;
/**
@@ -37,7 +48,9 @@ import org.jetbrains.annotations.TestOnly;
*
* @see PartitionPruningMetadataExtractor
*/
-public class PartitionPruningColumns {
+public class PartitionPruningColumns implements Serializable {
+
+ private static final long serialVersionUID = 0;
private final List<Int2ObjectMap<RexNode>> columns;
@@ -51,12 +64,23 @@ public class PartitionPruningColumns {
return columns;
}
+ /** Returns {@code true} if this columns contain correlated variables. */
+ public boolean containCorrelatedVariables() {
+ return columns.stream()
+ .anyMatch(c ->
c.values().stream().anyMatch(PartitionPruningMetadataExtractor::isCorrelatedVariable));
+ }
+
/** {@inheritDoc} */
@Override
public String toString() {
return S.toString(PartitionPruningColumns.class, this, "columns",
columns);
}
+ @SuppressWarnings("unused")
+ private SerializedForm writeReplace() {
+ return new SerializedForm(this);
+ }
+
/** Returns column values in canonical form. E.g. {@code [1=2, 0=3]}
becomes {@code [0=3, 1=2]} */
@TestOnly
public static List<List<Map.Entry<Integer, RexNode>>>
canonicalForm(PartitionPruningColumns columns) {
@@ -66,4 +90,69 @@ public class PartitionPruningColumns {
.collect(Collectors.toList()))
.collect(Collectors.toList());
}
+
+ /**
+ * Serialized form to serialize rex nodes.
+ */
+ static class SerializedForm implements Serializable {
+
+ private static final long serialVersionUID = 0;
+
+ private final byte[] bytes;
+
+ private SerializedForm(PartitionPruningColumns columns) {
+ try (IgniteUnsafeDataOutput output = new
IgniteUnsafeDataOutput(256)) {
+ output.writeInt(columns.columns().size());
+
+ for (Int2ObjectMap<RexNode> columnMap : columns.columns()) {
+ output.writeInt(columnMap.size());
+
+ for (Int2ObjectMap.Entry<RexNode> columnValue :
columnMap.int2ObjectEntrySet()) {
+ String exprJson =
RelJsonWriter.toExprJson(columnValue.getValue());
+
+ output.writeInt(columnValue.getIntKey());
+ output.writeUTF(exprJson);
+ }
+ }
+
+ this.bytes = output.array();
+ } catch (IOException e) {
+ throw new IgniteException(Common.INTERNAL_ERR, "Unable to
serialize partition pruning metadata", e);
+ }
+ }
+
+ protected final Object readResolve() {
+ try (IgniteUnsafeDataInput input = new
IgniteUnsafeDataInput(bytes)) {
+ return readColumns(input);
+ } catch (IOException e) {
+ throw new IgniteException(Common.INTERNAL_ERR, "Unable to
deserialize partition pruning metadata", e);
+ }
+ }
+ }
+
+ private static PartitionPruningColumns readColumns(IgniteDataInput input)
throws IOException {
+ int numColumnSets = input.readInt();
+ List<Int2ObjectMap<RexNode>> result = new ArrayList<>(numColumnSets);
+
+ for (int i = 0; i < numColumnSets; i++) {
+ readExpressions(input, result);
+ }
+
+ return new PartitionPruningColumns(result);
+ }
+
+ private static void readExpressions(IgniteDataInput input,
List<Int2ObjectMap<RexNode>> output) throws IOException {
+ int numColumns = input.readInt();
+ Int2ObjectMap<RexNode> expr = new Int2ObjectOpenHashMap<>(numColumns);
+
+ for (int i = 0; i < numColumns; i++) {
+ int key = input.readInt();
+ String exprJson = input.readUTF();
+
+ RexNode rexNode = RelJsonReader.fromExprJson(exprJson);
+ expr.put(key, rexNode);
+ }
+
+ output.add(expr);
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
index b2d87869a5..72ad806aea 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.prepare.pruning;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import java.io.Serializable;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
@@ -28,7 +29,9 @@ import org.jetbrains.annotations.Nullable;
* @see PartitionPruningColumns
* @see PartitionPruningMetadataExtractor
*/
-public class PartitionPruningMetadata {
+public class PartitionPruningMetadata implements Serializable {
+
+ private static final long serialVersionUID = 0;
/** Empty metadata. */
public static final PartitionPruningMetadata EMPTY = new
PartitionPruningMetadata(Long2ObjectMaps.emptyMap());
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
index 81e1ec7bfb..9d02293bc6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
@@ -33,6 +33,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
@@ -466,7 +467,21 @@ public class PartitionPruningMetadataExtractor extends
IgniteRelShuttle {
}
private static boolean isValueExpr(RexNode node) {
- return node instanceof RexLiteral || node instanceof RexDynamicParam;
+ return node instanceof RexLiteral || node instanceof RexDynamicParam
|| isCorrelatedVariable(node);
+ }
+
+ static boolean isCorrelatedVariable(RexNode node) {
+ // Correlated variables a referenced via field access expressions
+ //
+ // SELECT * FROM t1 as cor WHERE EXISTS (SELECT 1 FROM t2 WHERE t2.c1
= cor.c1)
+ //
+ // So condition `t2.c1 = cor.c1` is translated to $t0 = $cor0.C1
+ if (node.isA(SqlKind.FIELD_ACCESS)) {
+ RexFieldAccess fieldAccess = (RexFieldAccess) node;
+ return fieldAccess.getReferenceExpr().isA(SqlKind.CORREL_VARIABLE);
+ } else {
+ return false;
+ }
}
/** Intermediate result of extracting partition pruning metadata. */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
index 090fa0b097..d26ae19edd 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
@@ -41,9 +41,12 @@ import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.TimestampString;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
+import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.jetbrains.annotations.Nullable;
@@ -55,34 +58,28 @@ public final class PartitionPruningPredicate {
private static final ZoneId ZONE_ID_UTC = ZoneId.of("UTC");
- private final int tablePartitions;
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-21543 Remove after
is resolved,
- // remaining partitions should always be not null.
- private final @Nullable IntSet remainingPartitions;
-
- /**
- * Constructor.
- *
- * @param table Table.
- * @param pruningColumns Columns.
- * @param dynamicParameters Values of dynamic parameters.
- */
- public PartitionPruningPredicate(IgniteTable table,
PartitionPruningColumns pruningColumns, Object[] dynamicParameters) {
- this.tablePartitions = table.partitions();
- this.remainingPartitions = computeRemainingPartitions(table,
pruningColumns, dynamicParameters);
+ private PartitionPruningPredicate() {
}
/**
* Applies partition pruning to the given colocation group. This group
should have the same number of assignments as the source table.
*
+ * @param table Table.
+ * @param pruningColumns Partition pruning metadata.
+ * @param dynamicParameters Values dynamic parameters.
* @param colocationGroup Colocation group.
*
* @return New colocation group.
*/
- public ColocationGroup prunePartitions(ColocationGroup colocationGroup) {
- assert tablePartitions == colocationGroup.assignments().size() :
"Number of partitions does not match";
+ public static ColocationGroup prunePartitions(
+ IgniteTable table,
+ PartitionPruningColumns pruningColumns,
+ Object[] dynamicParameters,
+ ColocationGroup colocationGroup) {
+ assert table.partitions() == colocationGroup.assignments().size() :
"Number of partitions does not match";
+
+ IntSet remainingPartitions = computeRemainingPartitions(table,
pruningColumns, dynamicParameters);
if (remainingPartitions == null) {
return colocationGroup;
}
@@ -121,6 +118,51 @@ public final class PartitionPruningPredicate {
);
}
+ /**
+ * Applies partition pruning to the list of given assignments and returns
a list of partitions belonging to the given node.
+ *
+ * @param pruningColumns Partition pruning metadata.
+ * @param table Table.
+ * @param expressionFactory Expression factory.
+ * @param assignments Assignments.
+ * @param nodeName Node name.
+ *
+ * @return List of partitions that belong to the provided node.
+ */
+ public static <RowT> List<PartitionWithConsistencyToken> prunePartitions(
+ PartitionPruningColumns pruningColumns,
+ IgniteTable table,
+ ExpressionFactory<RowT> expressionFactory,
+ List<NodeWithConsistencyToken> assignments,
+ String nodeName
+ ) {
+ ImmutableIntList keys = table.distribution().getKeys();
+ PartitionCalculator partitionCalculator =
table.partitionCalculator().get();
+ List<PartitionWithConsistencyToken> result = new ArrayList<>();
+
+ for (Int2ObjectMap<RexNode> columns : pruningColumns.columns()) {
+ for (int key : keys) {
+ RexNode node = columns.get(key);
+ ColumnDescriptor descriptor =
table.descriptor().columnDescriptor(key);
+ NativeType physicalType = descriptor.physicalType();
+
+ Object valueInInternalForm =
expressionFactory.execute(node).get();
+ Class<?> storageType =
NativeTypeSpec.toClass(physicalType.spec(), descriptor.nullable());
+ Object value = TypeUtils.fromInternal(valueInInternalForm,
storageType);
+
+ partitionCalculator.append(value);
+ }
+
+ int p = partitionCalculator.partition();
+ NodeWithConsistencyToken token = assignments.get(p);
+
+ if (nodeName.equals(token.name())) {
+ result.add(new PartitionWithConsistencyToken(p,
token.enlistmentConsistencyToken()));
+ }
+ }
+
+ return result;
+ }
private static @Nullable IntSet computeRemainingPartitions(
IgniteTable table,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvidersTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvidersTest.java
new file mode 100644
index 0000000000..5ae3ff918c
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvidersTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningColumns;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/** Tests for {@link PartitionProvider} implementations. */
+@ExtendWith(MockitoExtension.class)
+public class PartitionProvidersTest extends BaseIgniteAbstractTest {
+
+ private static final long GROUP_ID = 1;
+
+ @Mock
+ private QueryTaskExecutor queryTaskExecutor;
+
+ @Test
+ public void testStaticPartitionProviderWithProvidedPartitions() {
+ List<NodeWithConsistencyToken> assignments = List.of(
+ new NodeWithConsistencyToken("n1", 0),
+ new NodeWithConsistencyToken("n2", 0),
+ new NodeWithConsistencyToken("n3", 0)
+ );
+
+ Map<String, List<PartitionWithConsistencyToken>> partitionsPerNode =
Map.of(
+ "n1", List.of(new PartitionWithConsistencyToken(1, 1L)),
+ "n2", List.of(new PartitionWithConsistencyToken(2, 2L)),
+ "n3", List.of(new PartitionWithConsistencyToken(3, 3L))
+ );
+
+ IgniteTable table = TestBuilders.table()
+ .name("T1")
+ .addKeyColumn("C1", NativeTypes.INT32)
+ .partitions(42)
+ .distribution(IgniteDistributions.affinity(List.of(0), 2, "3"))
+ .partitions(assignments.size())
+ .build();
+
+ ColocationGroup group = new ColocationGroup(
+ List.of(1L, 2L), List.of("n1", "n2", "n3"),
+ assignments,
+ partitionsPerNode
+ );
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n1", group, null);
+ expectPartitions(ctx, table, List.of(
+ new PartitionWithConsistencyToken(1, 1L)
+ ));
+ }
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n2", group, null);
+ expectPartitions(ctx, table, List.of(
+ new PartitionWithConsistencyToken(2, 2L)
+ ));
+ }
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n3", group, null);
+ expectPartitions(ctx, table, List.of(
+ new PartitionWithConsistencyToken(3, 3L)
+ ));
+ }
+ }
+
+ @Test
+ public void
testStaticPartitionProviderUsesAssignmentsWhenNoPartitionsPresent() {
+ List<NodeWithConsistencyToken> assignments = List.of(
+ new NodeWithConsistencyToken("n1", 0),
+ new NodeWithConsistencyToken("n2", 1),
+ new NodeWithConsistencyToken("n1", 2),
+ new NodeWithConsistencyToken("n1", 3),
+ new NodeWithConsistencyToken("n3", 4)
+ );
+
+ IgniteTable table = TestBuilders.table()
+ .name("T1")
+ .addKeyColumn("C1", NativeTypes.INT32)
+ .partitions(assignments.size())
+ .distribution(IgniteDistributions.affinity(List.of(0), 2, "3"))
+ .partitions(assignments.size())
+ .build();
+
+ ColocationGroup group = new ColocationGroup(
+ List.of(1L, 2L), List.of("n1", "n2", "n3"),
+ assignments
+ );
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n1", group, null);
+ expectPartitions(ctx, table,
+ List.of(
+ new PartitionWithConsistencyToken(0, 0L),
+ new PartitionWithConsistencyToken(2, 2L),
+ new PartitionWithConsistencyToken(3, 3L)
+ )
+ );
+ }
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n2", group, null);
+ expectPartitions(ctx, table, List.of(new
PartitionWithConsistencyToken(1, 1L)));
+ }
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n3", group, null);
+ expectPartitions(ctx, table, List.of(new
PartitionWithConsistencyToken(4, 4L)));
+ }
+ }
+
+ @Test
+ public void testDynamicPartitionProvider() {
+ List<NodeWithConsistencyToken> assignments = List.of(
+ new NodeWithConsistencyToken("n1", 0),
+ new NodeWithConsistencyToken("n2", 1),
+ new NodeWithConsistencyToken("n1", 2),
+ new NodeWithConsistencyToken("n1", 3),
+ new NodeWithConsistencyToken("n3", 4)
+ );
+
+ IgniteTable table = TestBuilders.table()
+ .name("T1")
+ .addKeyColumn("C1", NativeTypes.INT32)
+ .partitions(assignments.size())
+ .distribution(IgniteDistributions.affinity(List.of(0), 2, "3"))
+ .partitions(assignments.size())
+ .build();
+
+ ColocationGroup group = new ColocationGroup(
+ List.of(1L, 2L), List.of("n1", "n2", "n3"),
+ assignments,
+ Map.of()
+ );
+
+ PartitionPruningMetadata metadata = newMetadata(1);
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n1", group, metadata);
+ expectPartitions(ctx, table, List.of());
+ }
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n2", group, metadata);
+ expectPartitions(ctx, table, List.of(new
PartitionWithConsistencyToken(1, 1L)));
+ }
+
+ {
+ ExecutionContext<Object[]> ctx = newContext("n3", group, metadata);
+ expectPartitions(ctx, table, List.of());
+ }
+ }
+
+ private ExecutionContext<Object[]> newContext(
+ String nodeName,
+ ColocationGroup colocationGroup,
+ @Nullable PartitionPruningMetadata metadata
+ ) {
+ ClusterNodeImpl node = new ClusterNodeImpl(nodeName, nodeName, new
NetworkAddress("localhost", 1234));
+
+ Long2ObjectMap<ColocationGroup> map = new Long2ObjectOpenHashMap<>();
+ map.put(GROUP_ID, colocationGroup);
+
+ return TestBuilders.executionContext()
+ .queryId(UUID.randomUUID())
+ .executor(queryTaskExecutor)
+ .fragment(new FragmentDescription(1, false, map, null, null,
metadata))
+ .localNode(node)
+ .build();
+ }
+
+ private static PartitionPruningMetadata newMetadata(long sourceId) {
+ RexNode expr = Commons.rexBuilder().makeLiteral(1,
Commons.typeFactory().createSqlType(SqlTypeName.INTEGER));
+ PartitionPruningColumns columns = new
PartitionPruningColumns(List.of(Int2ObjectMaps.singleton(0, expr)));
+
+ return new
PartitionPruningMetadata(Long2ObjectMaps.singleton(sourceId, columns));
+ }
+
+ private static void expectPartitions(
+ ExecutionContext<Object[]> ctx,
+ IgniteTable table,
+ List<PartitionWithConsistencyToken> expected
+ ) {
+ ColocationGroup group = ctx.group(GROUP_ID);
+ assertNotNull(group, "no group does not exist: " + GROUP_ID);
+
+ PartitionProvider<Object[]> partitionProvider =
ctx.getPartitionProvider(1, group, table);
+
+ List<PartitionWithConsistencyToken> actual =
partitionProvider.getPartitions(ctx);
+ assertEquals(expected, actual, "Node: " + ctx.localNode().name());
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
index de08c93b80..e51b16ed61 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
@@ -94,7 +94,7 @@ public class ExpressionFactoryImplTest extends
BaseIgniteAbstractTest {
typeFactory = Commons.typeFactory();
FragmentDescription fragmentDescription = new FragmentDescription(1,
true,
- Long2ObjectMaps.emptyMap(), null, null);
+ Long2ObjectMaps.emptyMap(), null, null, null);
ExecutionContext<Object[]> ctx = TestBuilders.executionContext()
.queryId(UUID.randomUUID())
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 3f2d891a12..569f2864ff 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -209,6 +209,10 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
addTableIdent("NT", "N1");
addTableIdent("NT", "N2");
+ addTable("T1", "N0", "N1", "N2");
+ addTable("T2", "N0", "N1", "N2");
+ addTable("T3", "N0", "N1", "N2");
+
testRunner.runTest(this::initSchema, "correlated.test");
}
@@ -229,7 +233,7 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
@Test
public void testPartitionPruning() {
- addNodes("N1", "N2", "N3", "N4", "N5");
+ addNodes("N0", "N1", "N2", "N3", "N4", "N5");
addTable("T1", "N1", "N2", "N3");
addTable("T2", "N4", "N5");
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
index 25116f531c..4577a1eaca 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
@@ -26,12 +26,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
+import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -196,6 +199,25 @@ final class FragmentPrinter extends IgniteRelShuttle {
output.writeNewline();
}
+ PartitionPruningMetadata pruningMetadata =
mappedFragment.partitionPruningMetadata();
+ if (pruningMetadata != null) {
+ output.appendPadding();
+ output.writeKeyValue("pruningMetadata",
pruningMetadata.data().long2ObjectEntrySet()
+ .stream()
+ .map(e -> {
+ List<Map<Integer, RexNode>> columns =
e.getValue().columns().stream()
+ .map(TreeMap::new)
+ .collect(Collectors.toList());
+
+ return Map.entry(e.getLongKey(), columns);
+ })
+ .sorted(Entry.comparingByKey())
+ .collect(Collectors.toList())
+ .toString()
+ );
+ output.writeNewline();
+ }
+
output.appendPadding();
output.writeString("tree:");
output.writeNewline();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 2fae8e9bfb..804a67a9b1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -123,7 +123,7 @@ public abstract class AbstractExecutionTest<T> extends
IgniteAbstractTest {
}
protected FragmentDescription getFragmentDescription() {
- return new FragmentDescription(0, true, Long2ObjectMaps.emptyMap(),
null, null);
+ return new FragmentDescription(0, true, Long2ObjectMaps.emptyMap(),
null, null, null);
}
protected Object[] row(Object... fields) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 36180890e1..a800b03760 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -499,7 +499,7 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest<Object[]> {
ExecutionContext<Object[]> targetCtx = TestBuilders.executionContext()
.queryId(queryId)
.executor(taskExecutor)
- .fragment(new FragmentDescription(TARGET_FRAGMENT_ID, true,
Long2ObjectMaps.emptyMap(), null, null))
+ .fragment(new FragmentDescription(TARGET_FRAGMENT_ID, true,
Long2ObjectMaps.emptyMap(), null, null, null))
.localNode(localNode)
.build();
@@ -543,7 +543,7 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest<Object[]> {
ExecutionContext<Object[]> sourceCtx = TestBuilders.executionContext()
.queryId(queryId)
.executor(taskExecutor)
- .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true,
Long2ObjectMaps.emptyMap(), null, null))
+ .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true,
Long2ObjectMaps.emptyMap(), null, null, null))
.localNode(localNode)
.build();
@@ -575,7 +575,7 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest<Object[]> {
ExecutionContext<Object[]> sourceCtx = TestBuilders.executionContext()
.queryId(queryId)
.executor(taskExecutor)
- .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true,
Long2ObjectMaps.emptyMap(), null, null))
+ .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true,
Long2ObjectMaps.emptyMap(), null, null, null))
.localNode(localNode)
.build();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index f980bfceb1..e86e62369f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -208,8 +209,9 @@ public class IndexScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
RowFactory<Object[]> rowFactory = ctx.rowHandler().factory(rowSchema);
SingleRangeIterable<Object[]> conditions = new
SingleRangeIterable<>(new Object[]{}, null, false, false);
List<PartitionWithConsistencyToken> partitions =
scannableTable.getPartitions();
+ PartitionProvider<Object[]> partitionProvider =
PartitionProvider.fromPartitions(partitions);
- return new IndexScanNode<>(ctx, rowFactory, indexDescriptor,
scannableTable, tableDescriptor, partitions,
+ return new IndexScanNode<>(ctx, rowFactory, indexDescriptor,
scannableTable, tableDescriptor, partitionProvider,
comparator, conditions, null, null, null);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
index 001e508f01..a81535c525 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
@@ -285,6 +285,6 @@ public class ModifyNodeExecutionTest extends
AbstractExecutionTest<RowWrapper> {
@Override
protected FragmentDescription getFragmentDescription() {
ColocationGroup colocationGroup = new ColocationGroup(List.of(),
List.of(), List.of());
- return new FragmentDescription(0, true,
Long2ObjectMaps.singleton(SOURCE_ID, colocationGroup), null, null);
+ return new FragmentDescription(0, true,
Long2ObjectMaps.singleton(SOURCE_ID, colocationGroup), null, null, null);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 6d77d603a9..b24e441924 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -185,8 +186,9 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
}
};
ScannableTableImpl scanableTable = new
ScannableTableImpl(internalTable, rf -> rowConverter);
+ PartitionProvider<Object[]> partitionProvider =
PartitionProvider.fromPartitions(partsWithConsistencyTokens);
TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, scanableTable,
- partsWithConsistencyTokens, null, null, null);
+ partitionProvider, null, null, null);
RootNode<Object[]> root = new RootNode<>(ctx);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 040f70f24e..6fc2dafefd 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -101,6 +101,7 @@ import
org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
@@ -113,6 +114,7 @@ import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.internal.type.NumberNativeType;
import org.apache.ignite.internal.type.TemporalNativeType;
import org.apache.ignite.internal.type.VarlenNativeType;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.SubscriptionUtils;
import org.apache.ignite.internal.util.TransformingIterator;
import org.apache.ignite.internal.util.subscription.TransformingPublisher;
@@ -450,6 +452,9 @@ public class TestBuilders {
/** Sets the node this fragment will be executed on. */
ExecutionContextBuilder localNode(ClusterNode node);
+ /** Sets the dynamic parameters this fragment will be executed with. */
+ ExecutionContextBuilder dynamicParameters(Object... params);
+
/**
* Builds the context object.
*
@@ -459,11 +464,12 @@ public class TestBuilders {
}
private static class ExecutionContextBuilderImpl implements
ExecutionContextBuilder {
- private FragmentDescription description = new FragmentDescription(0,
true, null, null, null);
+ private FragmentDescription description = new FragmentDescription(0,
true, null, null, null, null);
private UUID queryId = null;
private QueryTaskExecutor executor = null;
private ClusterNode node = null;
+ private Object[] dynamicParams = ArrayUtils.OBJECT_EMPTY_ARRAY;
/** {@inheritDoc} */
@Override
@@ -497,6 +503,12 @@ public class TestBuilders {
return this;
}
+ @Override
+ public ExecutionContextBuilder dynamicParameters(Object... params) {
+ this.dynamicParams = params;
+ return this;
+ }
+
/** {@inheritDoc} */
@Override
public ExecutionContext<Object[]> build() {
@@ -507,7 +519,7 @@ public class TestBuilders {
node.name(),
description,
ArrayRowHandler.INSTANCE,
- Map.of(),
+ Commons.parametersMap(dynamicParams),
TxAttributes.fromTx(new NoOpTransaction(node.name())),
SqlQueryProcessor.DEFAULT_TIME_ZONE_ID
);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
index 945fa74465..192a31e80d 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
@@ -190,6 +190,32 @@ public class PartitionPruningTest extends
AbstractPlannerTest {
assertEquals(SqlTypeName.VARCHAR, lit2.getType().getSqlTypeName());
}
+ @Test
+ public void testCorrelatedQuery() throws Exception {
+ IgniteTable table1 = TestBuilders.table()
+ .name("T1")
+ .addKeyColumn("C1", NativeTypes.INT32)
+ .addColumn("C2", NativeTypes.INT32, false)
+ .distribution(IgniteDistributions.affinity(List.of(0), 1, 2))
+ .build();
+
+ IgniteTable table2 = TestBuilders.table()
+ .name("T2")
+ .addKeyColumn("C1", NativeTypes.INT32)
+ .addColumn("C2", NativeTypes.INT32)
+ .distribution(IgniteDistributions.affinity(List.of(0), 1, 2))
+ .build();
+
+ PartitionPruningMetadataExtractor extractor = new
PartitionPruningMetadataExtractor();
+
+ PartitionPruningMetadata actual = extractMetadata(extractor,
+ "SELECT * FROM t1 as cor WHERE EXISTS (SELECT 1 FROM t2 WHERE
t2.c1 = cor.c1 OR t2.c1=42)", table1, table2);
+
+ PartitionPruningColumns cols = actual.get(2);
+ assertNotNull(cols, "No metadata for source=2");
+ assertEquals("[[0=$cor0.C1], [0=42]]",
PartitionPruningColumns.canonicalForm(cols).toString());
+ }
+
private PartitionPruningMetadata extractMetadata(String query,
IgniteTable... table) throws Exception {
PartitionPruningMetadataExtractor extractor = new
PartitionPruningMetadataExtractor();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
index bd2b15945d..ca29b0d4ef 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -42,8 +43,12 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
@@ -60,11 +65,13 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.ColumnType;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
/**
* Tests for {@link PartitionPruningPredicate}.
@@ -100,7 +107,7 @@ public class PartitionPruningPredicateSelfTest extends
BaseIgniteAbstractTest {
IgniteDistribution distribution =
IgniteDistributions.affinity(List.of(0), 1, 1);
- NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 2,
2, 2);
+ NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 4,
2, 4);
IgniteTable table = TestBuilders.table()
.name("T")
@@ -115,13 +122,12 @@ public class PartitionPruningPredicateSelfTest extends
BaseIgniteAbstractTest {
RexNode expr = generateLiteralOrValueExpr(columnType, val);
PartitionPruningColumns columns = new
PartitionPruningColumns(List.of(Int2ObjectMaps.singleton(fieldIndex, expr)));
- PartitionPruningPredicate predicate = new
PartitionPruningPredicate(table, columns, new Object[0]);
List<String> nodeNames = List.of("n1", "n2", "n3");
List<NodeWithConsistencyToken> assignments = randomAssignments(table,
nodeNames);
ColocationGroup group = new ColocationGroup(List.of(0L), nodeNames,
assignments);
- expectPartitionsPruned(table, predicate, group, val);
+ expectPartitionsPruned(table, columns, new Object[0], group, val);
}
@ParameterizedTest
@@ -129,7 +135,7 @@ public class PartitionPruningPredicateSelfTest extends
BaseIgniteAbstractTest {
public void testDynamicParam(ColumnType columnType) {
IgniteDistribution distribution =
IgniteDistributions.affinity(List.of(0), 1, 1);
- NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 2,
2, 2);
+ NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 4,
2, 4);
IgniteTable table = TestBuilders.table()
.name("T")
@@ -145,18 +151,18 @@ public class PartitionPruningPredicateSelfTest extends
BaseIgniteAbstractTest {
Object[] dynamicParameters = {val};
PartitionPruningColumns columns = new
PartitionPruningColumns(List.of(Int2ObjectMaps.singleton(fieldIndex, expr)));
- PartitionPruningPredicate predicate = new
PartitionPruningPredicate(table, columns, dynamicParameters);
List<String> nodeNames = List.of("n1", "n2", "n3");
List<NodeWithConsistencyToken> assignments = randomAssignments(table,
nodeNames);
ColocationGroup group = new ColocationGroup(List.of(0L), nodeNames,
assignments);
- expectPartitionsPruned(table, predicate, group, val);
+ expectPartitionsPruned(table, columns, dynamicParameters, group, val);
}
private static void expectPartitionsPruned(
IgniteTable table,
- PartitionPruningPredicate predicate,
+ PartitionPruningColumns pruningColumns,
+ Object[] dynamicParameters,
ColocationGroup group,
Object... values
) {
@@ -166,7 +172,7 @@ public class PartitionPruningPredicateSelfTest extends
BaseIgniteAbstractTest {
PartitionWithConsistencyToken expectedPartition =
computeExpectedPartition(table, group.assignments(), values);
// Apply partition pruning to obtain new colocation group.
- ColocationGroup newGroup = predicate.prunePartitions(group);
+ ColocationGroup newGroup =
PartitionPruningPredicate.prunePartitions(table, pruningColumns,
dynamicParameters, group);
String expectedNode =
assignments.get(expectedPartition.partId()).name();
@@ -184,7 +190,28 @@ public class PartitionPruningPredicateSelfTest extends
BaseIgniteAbstractTest {
actual.put(nodeName, actualPartitions);
}
- assertEquals(expected, actual, "partitions per node");
+ assertEquals(expected, actual, "partitions per node (static)");
+
+ // ensure both implementations of prunePartitions produce the same
result.
+
+ Map<String, List<PartitionWithConsistencyToken>> dynamicActual = new
HashMap<>();
+
+ for (String nodeName : group.nodeNames()) {
+ ExecutionContext<Object[]> ctx = TestBuilders.executionContext()
+ .queryId(UUID.randomUUID())
+ .localNode(new ClusterNodeImpl(nodeName, nodeName, new
NetworkAddress("localhost", 123)))
+ .executor(Mockito.mock(QueryTaskExecutor.class))
+ .dynamicParameters(dynamicParameters)
+ .build();
+ ExpressionFactory<Object[]> expressionFactory =
ctx.expressionFactory();
+
+ List<PartitionWithConsistencyToken> result =
PartitionPruningPredicate.prunePartitions(
+ pruningColumns, table, expressionFactory, assignments,
nodeName
+ );
+ dynamicActual.put(nodeName, result);
+ }
+
+ assertEquals(expected, dynamicActual, "partitions per node (dynamic)");
}
private static PartitionWithConsistencyToken computeExpectedPartition(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PruningMetadataSerializationTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PruningMetadataSerializationTest.java
new file mode 100644
index 0000000000..2033f6dbe9
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PruningMetadataSerializationTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare.pruning;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests serialization of {@link PartitionPruningMetadata}.
+ */
+public class PruningMetadataSerializationTest extends BaseIgniteAbstractTest {
+
+ private final ClassDescriptorRegistry descriptorRegistry = new
ClassDescriptorRegistry();
+
+ private final ClassDescriptorFactory descriptorFactory = new
ClassDescriptorFactory(descriptorRegistry);
+
+ private final DefaultUserObjectMarshaller marshaller = new
DefaultUserObjectMarshaller(descriptorRegistry, descriptorFactory);
+
+ @Test
+ public void test() throws Exception {
+
+
+ RexBuilder rexBuilder = Commons.rexBuilder();
+ RexNode lit1 = rexBuilder.makeLiteral("1");
+ RexNode lit2 = rexBuilder.makeLiteral(true);
+
+
+ Long2ObjectMap<PartitionPruningColumns> map = new
Long2ObjectOpenHashMap<>();
+
+ map.put(1, new PartitionPruningColumns(List.of()));
+ map.put(2, new PartitionPruningColumns(List.of(
+ Int2ObjectMaps.singleton(0, lit1)
+ )));
+
+ Int2ObjectMap<RexNode> cols = new Int2ObjectOpenHashMap<>();
+ cols.put(0, lit2);
+ cols.put(1, lit1);
+ map.put(3, new PartitionPruningColumns(List.of(cols)));
+
+ PartitionPruningMetadata metadata = new PartitionPruningMetadata(map);
+
+ byte[] bytes = marshaller.marshal(metadata).bytes();
+
+ PartitionPruningMetadata fromBytes = marshaller.unmarshal(bytes,
descriptorRegistry);
+ assertNotNull(fromBytes, "Deserialized to null");
+
+ expectColumns(fromBytes, 1, List.of());
+ expectColumns(fromBytes, 2, List.of(Map.of(0, lit1)));
+ expectColumns(fromBytes, 3, List.of(Map.of(0, lit2, 1, lit1)));
+ }
+
+ private static void expectColumns(
+ PartitionPruningMetadata metadata,
+ long sourceId,
+ List<Map<Integer, RexNode>> cols
+ ) {
+ PartitionPruningColumns columns = metadata.get(sourceId);
+ assertNotNull(columns, format("No metadata for source#{}: {}",
sourceId, metadata));
+
+ assertEquals(cols, columns.columns(), format("Metadata for source#{}
does not match", sourceId));
+ }
+}
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 584c9f903b..09750faf36 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -199,3 +199,138 @@ Fragment#1
Sender(targetFragment=0, exchange=1, distribution=single)
TableScan(name=PUBLIC.T_N1, source=3, partitions=1,
distribution=affinity[table: T_N1, columns: [ID]])
---
+# Pass partition pruning metadata for correlated joins.
+N0
+SELECT * FROM t1_n0n1n2 as cor WHERE EXISTS (SELECT 1 FROM t2_n0n1n2 as t2
WHERE t2.id = cor.id)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1, 2]
+ exchangeSourceNodes: {1=[N0, N1, N2], 2=[N0, N1, N2]}
+ tree:
+ Project
+ CorrelatedNestedLoopJoin
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+ ColocatedHashAggregate
+ Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2 correlated
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T2_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [3=[{0=$cor0.ID}]]
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3,
distribution=random)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T1_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1N2, source=4, partitions=3,
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+---
+# Pass partition pruning metadata to correlated joins.
+N0
+SELECT * FROM t3_n0n1n2 AS out
+WHERE EXISTS (SELECT * FROM t1_n0n1n2 as cor WHERE out.id = cor.id AND EXISTS
(SELECT 1 FROM t2_n0n1n2 as t2 WHERE t2.id = cor.id))
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1, 2, 3]
+ exchangeSourceNodes: {1=[N0, N1, N2], 2=[N0, N1, N2], 3=[N0, N1, N2]}
+ tree:
+ Project
+ CorrelatedNestedLoopJoin
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+ ColocatedHashAggregate
+ Project
+ CorrelatedNestedLoopJoin
+ Receiver(sourceFragment=2, exchange=2, distribution=single)
+ ColocatedHashAggregate
+ Receiver(sourceFragment=3, exchange=3, distribution=single)
+
+Fragment#3 correlated
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T2_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [4=[{0=$cor1.ID}]]
+ tree:
+ Sender(targetFragment=0, exchange=3, distribution=single)
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
+
+Fragment#2 correlated
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T1_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [5=[{0=$cor0.ID}]]
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3,
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T3_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3,
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+---
+# Pass partition pruning metadata to correlated joins one layer deep.
+N0
+SELECT * FROM t3_n0n1n2 AS out
+WHERE EXISTS (
+ SELECT * FROM t1_n0n1n2 as cor
+ WHERE out.id = cor.id AND EXISTS (SELECT 1 FROM t2_n0n1n2 as t2 WHERE t2.id
= out.id or t2.id=cor.id)
+)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1, 2, 3]
+ exchangeSourceNodes: {1=[N0, N1, N2], 2=[N0, N1, N2], 3=[N0, N1, N2]}
+ tree:
+ Project
+ CorrelatedNestedLoopJoin
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+ ColocatedHashAggregate
+ Project
+ CorrelatedNestedLoopJoin
+ Receiver(sourceFragment=2, exchange=2, distribution=single)
+ ColocatedHashAggregate
+ Receiver(sourceFragment=3, exchange=3, distribution=single)
+
+Fragment#3 correlated
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T2_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [4=[{0=$cor0.ID}, {0=$cor2.ID}]]
+ tree:
+ Sender(targetFragment=0, exchange=3, distribution=single)
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
+
+Fragment#2 correlated
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T1_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [5=[{0=$cor0.ID}]]
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3,
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N0, N1, N2]
+ tables: [T3_N0N1N2]
+ partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3,
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+---
diff --git
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 9b8ce7d85f..1451a4d63b 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -108,3 +108,75 @@ Fragment#1
Sort
TableScan(name=PUBLIC.T1_N1N2N3, source=3, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
---
+# Correlated
+# Prune partitions from left arm statically, and pass meta to the right arm.
+# Same set of nodes.
+N0
+SELECT * FROM t1_n1n2n3 as cor WHERE cor.id = 42 and EXISTS (SELECT 1 FROM
t3_n1n2n3 as t2 WHERE t2.id = cor.id)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1, 2]
+ exchangeSourceNodes: {1=[N3], 2=[N1, N2, N3]}
+ tree:
+ Project
+ CorrelatedNestedLoopJoin
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+ ColocatedHashAggregate
+ Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2 correlated
+ targetNodes: [N0]
+ executionNodes: [N1, N2, N3]
+ tables: [T3_N1N2N3]
+ partitions: {N1=[0:3], N2=[1:3], N3=[2:3]}
+ pruningMetadata: [3=[{0=$cor0.ID}]]
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3,
distribution=random)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N3]
+ tables: [T1_N1N2N3]
+ partitions: {N3=[2:3]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+---
+# Correlated.
+# Prune partitions from left arm statically, and pass meta to the right arm.
+# Different sets of nodes.
+N0
+SELECT * FROM t1_n1n2n3 as cor WHERE cor.id = 42 and EXISTS (SELECT 1 FROM
t2_n4n5 as t2 WHERE t2.id = cor.id)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1, 2]
+ exchangeSourceNodes: {1=[N3], 2=[N4, N5]}
+ tree:
+ Project
+ CorrelatedNestedLoopJoin
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+ ColocatedHashAggregate
+ Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2 correlated
+ targetNodes: [N0]
+ executionNodes: [N4, N5]
+ tables: [T2_N4N5]
+ partitions: {N4=[0:2], N5=[1:2]}
+ pruningMetadata: [3=[{0=$cor0.ID}]]
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2,
distribution=random)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N3]
+ tables: [T1_N1N2N3]
+ partitions: {N3=[2:3]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+---