This is an automated email from the ASF dual-hosted git repository.

jooger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a0369a75f IGNITE-21311: Introduce pruning for SQL correlated scans 
(#3318)
5a0369a75f is described below

commit 5a0369a75ff59d03bc07835832dbbaac5aa0e88d
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed Mar 13 15:39:42 2024 +0200

    IGNITE-21311: Introduce pruning for SQL correlated scans (#3318)
---
 .../benchmark/SqlPartitionPruningBenchmark.java    |  62 +++++-
 .../internal/sql/engine/ItCorrelatesTest.java      |  56 ++++-
 .../sql/engine/exec/DynamicPartitionProvider.java  |  59 ++++++
 .../internal/sql/engine/exec/ExecutionContext.java |  16 ++
 .../sql/engine/exec/ExecutionServiceImpl.java      |   5 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |  15 +-
 .../sql/engine/exec/PartitionProvider.java         |  61 ++++++
 .../sql/engine/exec/StaticPartitionProvider.java   |  45 ++++
 .../sql/engine/exec/mapping/ColocationGroup.java   |  14 +-
 .../engine/exec/mapping/FragmentDescription.java   |  13 +-
 .../sql/engine/exec/mapping/MappedFragment.java    |  23 +-
 .../engine/exec/mapping/MappingServiceImpl.java    |   3 +-
 .../sql/engine/exec/rel/IndexScanNode.java         |  20 +-
 .../sql/engine/exec/rel/TableScanNode.java         |  21 +-
 .../sql/engine/externalize/RelJsonReader.java      |  25 ++-
 .../sql/engine/externalize/RelJsonWriter.java      |  17 ++
 .../prepare/pruning/PartitionPrunerImpl.java       |  17 +-
 .../prepare/pruning/PartitionPruningColumns.java   |  91 +++++++-
 .../prepare/pruning/PartitionPruningMetadata.java  |   5 +-
 .../pruning/PartitionPruningMetadataExtractor.java |  17 +-
 .../prepare/pruning/PartitionPruningPredicate.java |  78 +++++--
 .../sql/engine/exec/PartitionProvidersTest.java    | 234 +++++++++++++++++++++
 .../engine/exec/exp/ExpressionFactoryImplTest.java |   2 +-
 .../engine/exec/mapping/FragmentMappingTest.java   |   6 +-
 .../sql/engine/exec/mapping/FragmentPrinter.java   |  22 ++
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   2 +-
 .../sql/engine/exec/rel/ExchangeExecutionTest.java |   6 +-
 .../exec/rel/IndexScanNodeExecutionTest.java       |   4 +-
 .../engine/exec/rel/ModifyNodeExecutionTest.java   |   2 +-
 .../exec/rel/TableScanNodeExecutionTest.java       |   4 +-
 .../sql/engine/framework/TestBuilders.java         |  16 +-
 .../sql/engine/planner/PartitionPruningTest.java   |  26 +++
 .../pruning/PartitionPruningPredicateSelfTest.java |  45 +++-
 .../pruning/PruningMetadataSerializationTest.java  |  94 +++++++++
 .../src/test/resources/mapping/correlated.test     | 135 ++++++++++++
 .../resources/mapping/test_partition_pruning.test  |  72 +++++++
 36 files changed, 1227 insertions(+), 106 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
index bd56cdf6c0..baf68f2b9a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlPartitionPruningBenchmark.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.benchmark;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -85,10 +87,28 @@ public class SqlPartitionPruningBenchmark extends 
AbstractMultiNodeBenchmark {
                 List.of("key1")
         );
 
-        KeyValueView<Tuple, Tuple> keyValueView = 
clusterNode.tables().table("usertable2").keyValueView();
+        createTable("usertable3",
+                List.of(
+                        "key1 int",
+                        "key2 int",
+                        "field1   varchar(100)"
+                ),
+                List.of("key1", "key2"),
+                List.of("key1")
+        );
+
+        initTable("usertable2", 10);
+        initTable("usertable3", 1);
+
+        session = clusterNode.sql().createSession();
+    }
+
+    private static void initTable(String tableName, int fieldCount) {
+        KeyValueView<Tuple, Tuple> keyValueView = 
clusterNode.tables().table(tableName).keyValueView();
 
         try (Session session = clusterNode.sql().createSession()) {
-            try (var rs = session.execute(null, "CREATE INDEX 
usertable2_sorted_idx ON usertable2 USING TREE (key1, key2)")) {
+            String query = format("CREATE INDEX {}_sorted_idx ON {} USING TREE 
(key1, key2)", tableName, tableName);
+            try (var rs = session.execute(null, query)) {
                 while (rs.hasNext()) {
                     rs.next();
                 }
@@ -99,7 +119,7 @@ public class SqlPartitionPruningBenchmark extends 
AbstractMultiNodeBenchmark {
 
         for (int i = 0; i < TABLE_SIZE; i++) {
             Tuple t = Tuple.create();
-            for (int j = 1; j <= 10; j++) {
+            for (int j = 1; j <= fieldCount; j++) {
                 t.set("field" + j, FIELD_VAL);
             }
 
@@ -108,8 +128,6 @@ public class SqlPartitionPruningBenchmark extends 
AbstractMultiNodeBenchmark {
 
             keyValueView.put(null, key, t);
         }
-
-        session = clusterNode.sql().createSession();
     }
 
     /** Select by key - should use key value plan. */
@@ -136,7 +154,7 @@ public class SqlPartitionPruningBenchmark extends 
AbstractMultiNodeBenchmark {
 
     /** Select by a single colocation key - should use a scan w/o partition 
pruning because such predicate is too complex. */
     @Benchmark
-    public void selectWithNoPrunining(Blackhole bh) {
+    public void selectWithNoPruning(Blackhole bh) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int key = random.nextInt(TABLE_SIZE);
 
@@ -145,12 +163,40 @@ public class SqlPartitionPruningBenchmark extends 
AbstractMultiNodeBenchmark {
         }
     }
 
+    /** Correlated subquery with partition pruning .*/
+    @Benchmark
+    public void selectCorrelated(Blackhole bh) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int key = random.nextInt(TABLE_SIZE);
+
+        String query = "SELECT * FROM usertable2 as cor WHERE EXISTS "
+                + "(SELECT 1 FROM usertable3 WHERE usertable3.key1 = cor.key1) 
AND key1=?";
+
+        try (var rs = session.execute(null, query, key)) {
+            expectSingleRecord(rs, bh);
+        }
+    }
+
+    /** Correlated subquery without partition pruning .*/
+    @Benchmark
+    public void selectCorrelatedNoPruning(Blackhole bh) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int key = random.nextInt(TABLE_SIZE);
+
+        String query = "SELECT * FROM usertable2 as cor WHERE EXISTS "
+                + "(SELECT 1 FROM usertable3 WHERE usertable3.key1 >= cor.key1 
AND usertable3.key1 < cor.key1 + 1) AND key1=?";
+
+        try (var rs = session.execute(null, query, key)) {
+            expectSingleRecord(rs, bh);
+        }
+    }
+
     /**
      * Benchmark's entry point.
      */
     public static void main(String[] args) throws RunnerException {
         Options opt = new OptionsBuilder()
-                .include(".*" + 
SqlPartitionPruningBenchmark.class.getSimpleName() + ".*")
+                .include(".*" + 
SqlPartitionPruningBenchmark.class.getSimpleName() + ".selectCorrelated*")
                 .build();
 
         new Runner(opt).run();
@@ -168,7 +214,7 @@ public class SqlPartitionPruningBenchmark extends 
AbstractMultiNodeBenchmark {
             i += 1;
         }
         if (i != 1) {
-            throw new IllegalArgumentException("There should be exactly 1 
output row");
+            throw new IllegalArgumentException("There should be exactly 1 
output row but got " + i);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index bfb84bc68b..99a804b890 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -37,9 +37,6 @@ public class ItCorrelatesTest extends BaseSqlIntegrationTest {
     public void testCorrelatesAssignedBeforeAccess() {
         sql("create table test_tbl(k INTEGER primary key, v INTEGER)");
 
-        // TODO: IGNITE-16323 When the issue is not fixed the invocation 
required for update metadata.
-        CLUSTER.aliveNode().tables().tables();
-
         sql("INSERT INTO test_tbl VALUES (1, 1)");
 
         assertQuery("SELECT " + DISABLED_JOIN_RULES + " t0.v, (SELECT t0.v + 
t1.v FROM test_tbl t1) AS j FROM test_tbl t0")
@@ -85,4 +82,57 @@ public class ItCorrelatesTest extends BaseSqlIntegrationTest 
{
                 .returns(12, 2)
                 .check();
     }
+
+    @Test
+    public void testCorrelations() {
+        sql("CREATE TABLE t1 (id INTEGER PRIMARY KEY, val INTEGER)");
+        sql("CREATE TABLE t2 (id INTEGER PRIMARY KEY, val INTEGER)");
+        sql("CREATE TABLE t3 (id INTEGER PRIMARY KEY, val INTEGER)");
+
+        sql("INSERT INTO t1 VALUES(1, 2)");
+        sql("INSERT INTO t1 VALUES(13, 14)");
+        sql("INSERT INTO t1 VALUES(42, 43)");
+
+        sql("INSERT INTO t2 VALUES(1, 2)");
+        sql("INSERT INTO t2 VALUES(42, 43)");
+
+        sql("INSERT INTO t3 VALUES(1, 11)");
+        sql("INSERT INTO t3 VALUES(13, 23)");
+        sql("INSERT INTO t3 VALUES(42, 52)");
+
+        // t1 -> t2 (t2 references t1)
+
+        assertQuery("SELECT * FROM t1 as cor WHERE EXISTS (SELECT 1 FROM t2 
WHERE t2.id = cor.id)")
+                .returns(1, 2)
+                .returns(42, 43)
+                .check();
+
+        assertQuery("SELECT * FROM t1 as cor WHERE NOT EXISTS (SELECT 1 FROM 
t2 WHERE t2.id = cor.id)")
+                .returns(13, 14)
+                .check();
+
+        // t3 -> t1 -> t2 (t2 references t1)
+
+        assertQuery("SELECT * FROM t3 AS out\n"
+                + "WHERE EXISTS (SELECT * FROM t1 as cor WHERE out.id = cor.id 
AND EXISTS "
+                + "(SELECT 1 FROM t2 WHERE t2.id = cor.id))")
+                .returns(1, 11)
+                .returns(42, 52)
+                .check();
+
+        assertQuery("SELECT * FROM t3 AS out\n"
+                + "WHERE NOT EXISTS (SELECT * FROM t1 as cor WHERE out.id = 
cor.id AND EXISTS "
+                + "(SELECT 1 FROM t2 WHERE t2.id = cor.id))")
+                .returns(13, 23)
+                .check();
+
+        // t3 -> t1 -> t2 (t2 references both t3 and t1)
+
+        assertQuery("SELECT * FROM t3 AS out\n"
+                + "WHERE EXISTS (SELECT * FROM t1 as cor WHERE out.id = cor.id 
AND EXISTS "
+                + "(SELECT 1 FROM t2 WHERE t2.id = out.id OR t2.id = cor.id))")
+                .returns(1, 11)
+                .returns(42, 52)
+                .check();
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DynamicPartitionProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DynamicPartitionProvider.java
new file mode 100644
index 0000000000..8910043a01
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DynamicPartitionProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.List;
+import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningColumns;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningPredicate;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+
+/**
+ * Partition provider that returns partitions based on the values of provided 
by execution context at runtime.
+ */
+public class DynamicPartitionProvider<RowT> implements PartitionProvider<RowT> 
{
+
+    private final String nodeName;
+
+    private final List<NodeWithConsistencyToken> assignments;
+
+    private final PartitionPruningColumns columns;
+
+    private final IgniteTable table;
+
+    /** Constructor. */
+    public DynamicPartitionProvider(
+            String nodeName,
+            List<NodeWithConsistencyToken> assignments,
+            PartitionPruningColumns columns,
+            IgniteTable table
+    ) {
+        this.nodeName = nodeName;
+        this.assignments = assignments;
+        this.columns = columns;
+        this.table = table;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<PartitionWithConsistencyToken> 
getPartitions(ExecutionContext<RowT> ctx) {
+        ExpressionFactory<RowT> expressionFactory = ctx.expressionFactory();
+
+        return PartitionPruningPredicate.prunePartitions(columns, table, 
expressionFactory, assignments, nodeName);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index bf4a9e1b44..75ca44ef80 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -43,6 +43,9 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningColumns;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.network.ClusterNode;
@@ -372,6 +375,19 @@ public class ExecutionContext<RowT> implements DataContext 
{
         return cancelFlag.get();
     }
 
+    /** Creates {@link PartitionProvider} for the given source table. */
+    public PartitionProvider<RowT> getPartitionProvider(long sourceId, 
ColocationGroup group, IgniteTable table) {
+        PartitionPruningMetadata metadata = 
description.partitionPruningMetadata();
+        PartitionPruningColumns columns = metadata != null ? 
metadata.get(sourceId) : null;
+        String nodeName = localNode.name();
+
+        if (columns == null) {
+            return new StaticPartitionProvider<>(nodeName, group, sourceId);
+        } else {
+            return new DynamicPartitionProvider<>(nodeName, 
group.assignments(), columns, table);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public boolean equals(Object o) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 6ba934e07f..89a1c07932 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -133,7 +133,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private static final List<InternalSqlRow> NOT_APPLIED_ANSWER = List.of(new 
InternalSqlRowSingleBoolean(false));
 
     private static final FragmentDescription DUMMY_DESCRIPTION = new 
FragmentDescription(
-            0, true, Long2ObjectMaps.emptyMap(), null, null
+            0, true, Long2ObjectMaps.emptyMap(), null, null, null
     );
 
     private final MessageService messageService;
@@ -922,7 +922,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                                 !fragment.correlated(),
                                 mappedFragment.groupsBySourceId(),
                                 mappedFragment.target(),
-                                mappedFragment.sourcesByExchangeId()
+                                mappedFragment.sourcesByExchangeId(),
+                                mappedFragment.partitionPruningMetadata()
                         );
 
                         for (String nodeName : mappedFragment.nodes()) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 2240e5f80e..6b44790bd7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.combinedRowType;
 import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
 import static org.apache.ignite.internal.util.ArrayUtils.asList;
@@ -392,9 +391,7 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
 
         RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
         RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
-
-        List<PartitionWithConsistencyToken> partitions = 
group.partitionsWithConsistencyTokens(ctx.localNode().name());
-        assert !partitions.isEmpty() : format("No partitions for node {} 
group: {}", ctx.localNode().name(), group);
+        PartitionProvider<RowT> partitionProvider = 
ctx.getPartitionProvider(rel.sourceId(), group, tbl);
 
         return new IndexScanNode<>(
                 ctx,
@@ -402,7 +399,7 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
                 idx,
                 scannableTable,
                 tbl.descriptor(),
-                partitions,
+                partitionProvider,
                 comp,
                 ranges,
                 filters,
@@ -428,7 +425,8 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
         Predicate<RowT> filters = condition == null ? null : 
expressionFactory.predicate(condition, rowType);
         Function<RowT, RowT> prj = projects == null ? null : 
expressionFactory.project(projects, rowType);
 
-        ColocationGroup group = ctx.group(rel.sourceId());
+        long sourceId = rel.sourceId();
+        ColocationGroup group = ctx.group(sourceId);
 
         assert group != null;
 
@@ -439,14 +437,13 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
         RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
         RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
 
-        List<PartitionWithConsistencyToken> partitions = 
group.partitionsWithConsistencyTokens(ctx.localNode().name());
-        assert !partitions.isEmpty() : format("No partitions for node {} 
group: {}", ctx.localNode().name(), group);
+        PartitionProvider<RowT> partitionProvider = 
ctx.getPartitionProvider(rel.sourceId(), group, tbl);
 
         return new TableScanNode<>(
                 ctx,
                 rowFactory,
                 scannableTable,
-                partitions,
+                partitionProvider,
                 filters,
                 prj,
                 requiredColumns == null ? null : requiredColumns.toBitSet()
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvider.java
new file mode 100644
index 0000000000..35e1789e0d
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Returns a list of partitions for a particular scan operation.
+ */
+@FunctionalInterface
+public interface PartitionProvider<RowT> {
+    /**
+     * Returns a list of partitions for a scan operation.
+     *
+     * @param ctx Execution context.
+     * @return a list of partitions.
+     */
+    List<PartitionWithConsistencyToken> getPartitions(ExecutionContext<RowT> 
ctx);
+
+    /** Returns a partition provider that always returns the given list of 
partitions. */
+    static <RowT> PartitionProvider<RowT> 
fromPartitions(List<PartitionWithConsistencyToken> partitions) {
+        return ctx -> partitions;
+    }
+
+    /**
+     * Returns a list of partitions that belong to to the given node.
+     *
+     * @param assignments Assignments.
+     * @param nodeName node name.
+     *
+     * @return List of partitions.
+     */
+    static List<PartitionWithConsistencyToken> 
partitionsForNode(List<NodeWithConsistencyToken> assignments, String nodeName) {
+        List<PartitionWithConsistencyToken> result = new ArrayList<>();
+
+        for (int i = 0; i < assignments.size(); i++) {
+            NodeWithConsistencyToken a = assignments.get(i);
+            if (a.name().equals(nodeName)) {
+                result.add(new PartitionWithConsistencyToken(i, 
a.enlistmentConsistencyToken()));
+            }
+        }
+
+        return result;
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/StaticPartitionProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/StaticPartitionProvider.java
new file mode 100644
index 0000000000..82b3f807fc
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/StaticPartitionProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.util.List;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+
+/**
+ * Returns a list of partitions computed at mapping stage. This provider never 
returns an empty list.
+ */
+public class StaticPartitionProvider<RowT> implements PartitionProvider<RowT> {
+
+    private final List<PartitionWithConsistencyToken> partitions;
+
+    /** Constructor. */
+    public StaticPartitionProvider(String nodeName, ColocationGroup 
colocationGroup, long sourceId) {
+        List<PartitionWithConsistencyToken> partitions = 
colocationGroup.partitionsWithConsistencyTokens(nodeName);
+        assert !partitions.isEmpty() : format("No partitions for node {} 
group: {}", nodeName, colocationGroup);
+
+        this.partitions = partitions;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public List<PartitionWithConsistencyToken> 
getPartitions(ExecutionContext<RowT> ctx) {
+        return partitions;
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
index 9e60674a24..b3d440518c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 
 /**
@@ -99,17 +99,7 @@ public class ColocationGroup implements Serializable {
         if (partitionsPerNode != null) {
             return partitionsPerNode.getOrDefault(nodeName, 
Collections.emptyList());
         } else {
-            List<PartitionWithConsistencyToken> partitions = new ArrayList<>();
-
-            for (int p = 0; p < assignments.size(); p++) {
-                NodeWithConsistencyToken nodeWithConsistencyToken = 
assignments.get(p);
-
-                if (Objects.equals(nodeName, nodeWithConsistencyToken.name())) 
{
-                    partitions.add(new PartitionWithConsistencyToken(p, 
nodeWithConsistencyToken.enlistmentConsistencyToken()));
-                }
-            }
-
-            return partitions;
+            return PartitionProvider.partitionsForNode(assignments, nodeName);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
index 10de4a37d5..f9bfbbb883 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentDescription.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import java.io.Serializable;
 import java.util.List;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -35,6 +36,7 @@ public class FragmentDescription implements Serializable {
     private final Long2ObjectMap<ColocationGroup> groupsBySourceId;
     private final @Nullable ColocationGroup target;
     private final @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId;
+    private final @Nullable PartitionPruningMetadata pruningMetadata;
 
     /**
      * Constructor.
@@ -50,13 +52,15 @@ public class FragmentDescription implements Serializable {
             boolean prefetch,
             Long2ObjectMap<ColocationGroup> groupsBySourceId,
             @Nullable ColocationGroup target,
-            @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId
+            @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId,
+            @Nullable PartitionPruningMetadata pruningMetadata
     ) {
         this.fragmentId = fragmentId;
         this.prefetch = prefetch;
         this.groupsBySourceId = groupsBySourceId;
         this.target = target;
         this.sourcesByExchangeId = sourcesByExchangeId;
+        this.pruningMetadata = pruningMetadata;
     }
 
     /** Returns {@code true} if it's safe to execute this fragment in advance. 
*/
@@ -95,4 +99,11 @@ public class FragmentDescription implements Serializable {
     public @Nullable ColocationGroup group(long sourceId) {
         return groupsBySourceId.get(sourceId);
     }
+
+    /**
+     * Returns partition pruning metadata.
+     */
+    public @Nullable PartitionPruningMetadata partitionPruningMetadata() {
+        return pruningMetadata;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
index b5ad0f9fe4..ede13d3d86 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragment.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -43,13 +44,15 @@ public class MappedFragment {
     private final Long2ObjectMap<ColocationGroup> groupsBySourceId;
     private final @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId;
     private final @Nullable ColocationGroup target;
+    private final PartitionPruningMetadata partitionPruningMetadata;
 
     /** Constructor. */
     MappedFragment(
             Fragment fragment,
             List<ColocationGroup> groups,
             @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId,
-            @Nullable ColocationGroup target
+            @Nullable ColocationGroup target,
+            @Nullable PartitionPruningMetadata pruningMetadata
     ) {
         this.fragment = fragment;
         this.groups = groups;
@@ -69,6 +72,7 @@ public class MappedFragment {
         this.groupsBySourceId = groupsBySourceId;
         this.sourcesByExchangeId = sourcesByExchangeId;
         this.target = target;
+        this.partitionPruningMetadata = pruningMetadata;
     }
 
     /** Constructor. */
@@ -77,7 +81,8 @@ public class MappedFragment {
             List<ColocationGroup> groups, List<String> nodes,
             Long2ObjectMap<ColocationGroup> groupsBySourceId,
             @Nullable Long2ObjectMap<List<String>> sourcesByExchangeId,
-            @Nullable ColocationGroup target
+            @Nullable ColocationGroup target,
+            @Nullable PartitionPruningMetadata pruningMetadata
     ) {
         this.fragment = fragment;
         this.nodes = List.copyOf(nodes);
@@ -85,6 +90,7 @@ public class MappedFragment {
         this.sourcesByExchangeId = sourcesByExchangeId;
         this.target = target;
         this.groups = groups;
+        this.partitionPruningMetadata = pruningMetadata;
     }
 
     public Fragment fragment() {
@@ -111,6 +117,10 @@ public class MappedFragment {
         return sourcesByExchangeId;
     }
 
+    public @Nullable PartitionPruningMetadata partitionPruningMetadata() {
+        return partitionPruningMetadata;
+    }
+
     /**
      * Creates a fragment by replacing the given colocation groups.
      *
@@ -130,7 +140,7 @@ public class MappedFragment {
             }
         }
 
-        return new MappedFragment(fragment, newGroups, sourcesByExchangeId, 
target);
+        return new MappedFragment(fragment, newGroups, sourcesByExchangeId, 
target, partitionPruningMetadata);
     }
 
     /**
@@ -149,6 +159,11 @@ public class MappedFragment {
         newSourcesByExchangeId.put(exchangeId, newNodes);
 
         // The nodes should remain the same in order to preserve connectivity 
between fragments.
-        return new MappedFragment(fragment, groups, nodes, groupsBySourceId, 
newSourcesByExchangeId, target);
+        return new MappedFragment(fragment,  groups, nodes, groupsBySourceId, 
newSourcesByExchangeId, target, partitionPruningMetadata);
+    }
+
+    /** Adds partition pruning metadata to this fragment. */
+    public MappedFragment 
withPartitionPruningMetadata(PartitionPruningMetadata pruningMetadata) {
+        return new MappedFragment(fragment, groups, sourcesByExchangeId, 
target, pruningMetadata);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index a3402efc42..13bf05cb35 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -269,7 +269,8 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
                                         fragment,
                                         mapping.groups(),
                                         sourcesByExchangeId,
-                                        targetGroup
+                                        targetGroup,
+                                        null
                                 )
                         );
                     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index e85ac8da02..987b8fdf9d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -28,6 +28,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
@@ -52,8 +53,8 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
 
     private final RowHandler.RowFactory<RowT> factory;
 
-    /** List of pairs containing the partition number to scan with the 
corresponding enlistment consistency token. */
-    private final Collection<PartitionWithConsistencyToken> 
partsWithConsistencyTokens;
+    /** Returns partitions to be used by this scan. */
+    private final PartitionProvider<RowT> partitionProvider;
 
     /** Participating columns. */
     private final @Nullable BitSet requiredColumns;
@@ -70,8 +71,7 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
      * @param ctx Execution context.
      * @param rowFactory Row factory.
      * @param tableDescriptor Table descriptor.
-     * @param partsWithConsistencyTokens List of pairs containing the 
partition number to scan with the corresponding enlistment
-     *         consistency token.
+     * @param partitionProvider Partition provider.
      * @param comp Rows comparator.
      * @param rangeConditions Range conditions.
      * @param filters Optional filter to filter out rows.
@@ -84,7 +84,7 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
             IgniteIndex schemaIndex,
             ScannableTable table,
             TableDescriptor tableDescriptor,
-            Collection<PartitionWithConsistencyToken> 
partsWithConsistencyTokens,
+            PartitionProvider<RowT> partitionProvider,
             @Nullable Comparator<RowT> comp,
             @Nullable RangeIterable<RowT> rangeConditions,
             @Nullable Predicate<RowT> filters,
@@ -93,11 +93,9 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
     ) {
         super(ctx, filters, rowTransformer);
 
-        assert partsWithConsistencyTokens != null && 
!partsWithConsistencyTokens.isEmpty();
-
         this.schemaIndex = schemaIndex;
         this.table = table;
-        this.partsWithConsistencyTokens = partsWithConsistencyTokens;
+        this.partitionProvider = partitionProvider;
         this.requiredColumns = requiredColumns;
         this.rangeConditions = rangeConditions;
         this.comp = comp;
@@ -113,11 +111,13 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
     /** {@inheritDoc} */
     @Override
     protected Publisher<RowT> scan() {
+        List<PartitionWithConsistencyToken> partitions = 
partitionProvider.getPartitions(context());
+
         if (rangeConditions != null) {
             return SubscriptionUtils.concat(
-                    new TransformingIterator<>(rangeConditions.iterator(), 
cond -> indexPublisher(partsWithConsistencyTokens, cond)));
+                    new TransformingIterator<>(rangeConditions.iterator(), 
cond -> indexPublisher(partitions, cond)));
         } else {
-            return indexPublisher(partsWithConsistencyTokens, null);
+            return indexPublisher(partitions, null);
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index bcfcb1e76f..5425978b95 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -18,12 +18,13 @@
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import java.util.BitSet;
-import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -40,8 +41,8 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
     /** Table that provides access to underlying data. */
     private final ScannableTable table;
 
-    /** List of pairs containing the partition number to scan with the 
corresponding enlistment consistency token. */
-    private final Collection<PartitionWithConsistencyToken> 
partsWithConsistencyTokens;
+    /** Returns partitions to be used by this scan. */
+    private final PartitionProvider<RowT> partitionProvider;
 
     private final RowFactory<RowT> rowFactory;
 
@@ -53,7 +54,7 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
      * @param ctx Execution context.
      * @param rowFactory Row factory.
      * @param table Internal table.
-     * @param partsWithConsistencyTokens List of pairs containing the 
partition number to scan with the corresponding enlistment
+     * @param partitionProvider List of pairs containing the partition number 
to scan with the corresponding enlistment
      *         consistency token.
      * @param filters Optional filter to filter out rows.
      * @param rowTransformer Optional projection function.
@@ -63,17 +64,15 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
             ExecutionContext<RowT> ctx,
             RowHandler.RowFactory<RowT> rowFactory,
             ScannableTable table,
-            Collection<PartitionWithConsistencyToken> 
partsWithConsistencyTokens,
+            PartitionProvider<RowT> partitionProvider,
             @Nullable Predicate<RowT> filters,
             @Nullable Function<RowT, RowT> rowTransformer,
             @Nullable BitSet requiredColumns
     ) {
         super(ctx, filters, rowTransformer);
 
-        assert partsWithConsistencyTokens != null && 
!partsWithConsistencyTokens.isEmpty();
-
         this.table = table;
-        this.partsWithConsistencyTokens = partsWithConsistencyTokens;
+        this.partitionProvider = partitionProvider;
         this.rowFactory = rowFactory;
         this.requiredColumns = requiredColumns;
     }
@@ -81,10 +80,10 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
     /** {@inheritDoc} */
     @Override
     protected Publisher<RowT> scan() {
+        List<PartitionWithConsistencyToken> partitions = 
partitionProvider.getPartitions(context());
+
         Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
-                partsWithConsistencyTokens.iterator(), 
partWithConsistencyToken -> {
-            return table.scan(context(), partWithConsistencyToken, rowFactory, 
requiredColumns);
-        });
+                partitions.iterator(), p -> table.scan(context(), p, 
rowFactory, requiredColumns));
 
         return SubscriptionUtils.concat(it);
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index f251f4c8b2..f5842cb6b8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -62,7 +62,9 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class RelJsonReader {
-    private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF 
= new TypeReference<>() {};
+    static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF = new 
TypeReference<>() {};
+
+    private static final Map<String, Object> EMPTY_JSON_RELS = Map.of("rels", 
List.of());
 
     private final ObjectMapper mapper = new 
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
 
@@ -84,6 +86,21 @@ public class RelJsonReader {
         return (T) reader.read(json);
     }
 
+    /** Creates a {@link RexNode} from the given json. */
+    public static RexNode fromExprJson(String json) {
+        try {
+            RelJsonReader reader = new RelJsonReader(null);
+
+            RelInput relInput = reader.newInput(EMPTY_JSON_RELS);
+
+            LinkedHashMap<String, Object> val = reader.mapper.readValue(json, 
TYPE_REF);
+
+            return reader.relJson.toRex(relInput, val);
+        } catch (IOException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, "RelJson 
expression serialization error", e);
+        }
+    }
+
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -120,11 +137,15 @@ public class RelJsonReader {
         String id = (String) jsonRel.get("id");
         String type = (String) jsonRel.get("relOp");
         Function<RelInput, RelNode> factory = relJson.factory(type);
-        RelNode rel = factory.apply(new RelInputImpl(jsonRel));
+        RelNode rel = factory.apply(newInput(jsonRel));
         relMap.put(id, rel);
         lastRel = rel;
     }
 
+    private RelInput newInput(Map<String, Object> jsonRel) {
+        return new RelInputImpl(jsonRel);
+    }
+
     private class RelInputImpl implements RelInputEx {
         private final Map<String, Object> jsonRel;
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
index 73c8c6e485..95cb8de3d9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonWriter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.externalize;
 
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
@@ -30,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -66,6 +68,21 @@ public class RelJsonWriter implements RelWriter {
         return writer.asString();
     }
 
+    /** Converts the given {@link RexNode} to json. */
+    public static String toExprJson(RexNode node) {
+        ObjectMapper mapper = new ObjectMapper();
+
+        try {
+            RelJson relJson = new RelJson();
+
+            Object map = relJson.toJson(node);
+
+            return mapper.writeValueAsString(map);
+        } catch (JsonProcessingException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, "RelJson 
expression serialization error", e);
+        }
+    }
+
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
index 3b3385e503..5b74118f54 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPrunerImpl.java
@@ -71,6 +71,17 @@ public class PartitionPrunerImpl implements PartitionPruner {
                 continue;
             }
 
+            // Do not update colocation groups, in case when predicates 
include correlated variables,
+            // because partitions for such case can be removed only at runtime.
+            boolean containCorrelatedVariables = 
pruningMetadata.data().values()
+                    .stream()
+                    
.anyMatch(PartitionPruningColumns::containCorrelatedVariables);
+
+            if (containCorrelatedVariables) {
+                
updatedFragments.add(mappedFragment.withPartitionPruningMetadata(pruningMetadata));
+                continue;
+            }
+
             // Update fragment by applying PP metadata.
             MappedFragment newFragment = 
updateColocationGroups(mappedFragment, pruningMetadata, dynamicParameters);
 
@@ -161,8 +172,10 @@ public class PartitionPrunerImpl implements 
PartitionPruner {
             ColocationGroup colocationGroup = 
mappedFragment.groupsBySourceId().get(sourceId);
             assert colocationGroup != null : "No colocation group#" + sourceId;
 
-            PartitionPruningPredicate pruningPredicate = new 
PartitionPruningPredicate(table, pruningColumns, dynamicParameters);
-            ColocationGroup newColocationGroup = 
pruningPredicate.prunePartitions(colocationGroup);
+            ColocationGroup newColocationGroup = 
PartitionPruningPredicate.prunePartitions(
+                    table, pruningColumns, dynamicParameters,
+                    colocationGroup
+            );
 
             newColocationGroups.put(sourceId, newColocationGroup);
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
index b5e10b2432..8f795316ca 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningColumns.java
@@ -18,13 +18,24 @@
 package org.apache.ignite.internal.sql.engine.prepare.pruning;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
+import org.apache.ignite.internal.sql.engine.externalize.RelJsonWriter;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -37,7 +48,9 @@ import org.jetbrains.annotations.TestOnly;
  *
  * @see PartitionPruningMetadataExtractor
  */
-public class PartitionPruningColumns {
+public class PartitionPruningColumns implements Serializable {
+
+    private static final long serialVersionUID = 0;
 
     private final List<Int2ObjectMap<RexNode>> columns;
 
@@ -51,12 +64,23 @@ public class PartitionPruningColumns {
         return columns;
     }
 
+    /** Returns {@code true} if this columns contain correlated variables. */
+    public boolean containCorrelatedVariables() {
+        return columns.stream()
+                .anyMatch(c -> 
c.values().stream().anyMatch(PartitionPruningMetadataExtractor::isCorrelatedVariable));
+    }
+
     /** {@inheritDoc} */
     @Override
     public String toString() {
         return S.toString(PartitionPruningColumns.class, this, "columns", 
columns);
     }
 
+    @SuppressWarnings("unused")
+    private SerializedForm writeReplace() {
+        return new SerializedForm(this);
+    }
+
     /** Returns column values in canonical form. E.g. {@code [1=2, 0=3]} 
becomes {@code [0=3, 1=2]} */
     @TestOnly
     public static List<List<Map.Entry<Integer, RexNode>>> 
canonicalForm(PartitionPruningColumns columns) {
@@ -66,4 +90,69 @@ public class PartitionPruningColumns {
                         .collect(Collectors.toList()))
                 .collect(Collectors.toList());
     }
+
+    /**
+     * Serialized form to serialize rex nodes.
+     */
+    static class SerializedForm implements Serializable {
+
+        private static final long serialVersionUID = 0;
+
+        private final byte[] bytes;
+
+        private SerializedForm(PartitionPruningColumns columns) {
+            try (IgniteUnsafeDataOutput output = new 
IgniteUnsafeDataOutput(256)) {
+                output.writeInt(columns.columns().size());
+
+                for (Int2ObjectMap<RexNode> columnMap : columns.columns()) {
+                    output.writeInt(columnMap.size());
+
+                    for (Int2ObjectMap.Entry<RexNode> columnValue : 
columnMap.int2ObjectEntrySet()) {
+                        String exprJson = 
RelJsonWriter.toExprJson(columnValue.getValue());
+
+                        output.writeInt(columnValue.getIntKey());
+                        output.writeUTF(exprJson);
+                    }
+                }
+
+                this.bytes = output.array();
+            } catch (IOException e) {
+                throw new IgniteException(Common.INTERNAL_ERR, "Unable to 
serialize partition pruning metadata", e);
+            }
+        }
+
+        protected final Object readResolve() {
+            try (IgniteUnsafeDataInput input = new 
IgniteUnsafeDataInput(bytes)) {
+                return readColumns(input);
+            } catch (IOException e) {
+                throw new IgniteException(Common.INTERNAL_ERR, "Unable to 
deserialize partition pruning metadata", e);
+            }
+        }
+    }
+
+    private static PartitionPruningColumns readColumns(IgniteDataInput input) 
throws IOException {
+        int numColumnSets = input.readInt();
+        List<Int2ObjectMap<RexNode>> result = new ArrayList<>(numColumnSets);
+
+        for (int i = 0; i < numColumnSets; i++) {
+            readExpressions(input, result);
+        }
+
+        return new PartitionPruningColumns(result);
+    }
+
+    private static void readExpressions(IgniteDataInput input, 
List<Int2ObjectMap<RexNode>> output) throws IOException {
+        int numColumns = input.readInt();
+        Int2ObjectMap<RexNode> expr = new Int2ObjectOpenHashMap<>(numColumns);
+
+        for (int i = 0; i < numColumns; i++) {
+            int key = input.readInt();
+            String exprJson = input.readUTF();
+
+            RexNode rexNode = RelJsonReader.fromExprJson(exprJson);
+            expr.put(key, rexNode);
+        }
+
+        output.add(expr);
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
index b2d87869a5..72ad806aea 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadata.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.prepare.pruning;
 
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import java.io.Serializable;
 import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -28,7 +29,9 @@ import org.jetbrains.annotations.Nullable;
  * @see PartitionPruningColumns
  * @see PartitionPruningMetadataExtractor
  */
-public class PartitionPruningMetadata {
+public class PartitionPruningMetadata implements Serializable {
+
+    private static final long serialVersionUID = 0;
 
     /** Empty metadata. */
     public static final PartitionPruningMetadata EMPTY = new 
PartitionPruningMetadata(Long2ObjectMaps.emptyMap());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
index 81e1ec7bfb..9d02293bc6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java
@@ -33,6 +33,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexLocalRef;
 import org.apache.calcite.rex.RexNode;
@@ -466,7 +467,21 @@ public class PartitionPruningMetadataExtractor extends 
IgniteRelShuttle {
     }
 
     private static boolean isValueExpr(RexNode node) {
-        return node instanceof RexLiteral || node instanceof RexDynamicParam;
+        return node instanceof RexLiteral || node instanceof RexDynamicParam 
|| isCorrelatedVariable(node);
+    }
+
+    static boolean isCorrelatedVariable(RexNode node) {
+        // Correlated variables a referenced via field access expressions
+        //
+        // SELECT * FROM t1 as cor WHERE EXISTS (SELECT 1 FROM t2 WHERE t2.c1 
= cor.c1)
+        //
+        // So condition `t2.c1 = cor.c1` is translated to $t0 = $cor0.C1
+        if (node.isA(SqlKind.FIELD_ACCESS)) {
+            RexFieldAccess fieldAccess = (RexFieldAccess) node;
+            return fieldAccess.getReferenceExpr().isA(SqlKind.CORREL_VARIABLE);
+        } else {
+            return false;
+        }
     }
 
     /** Intermediate result of extracting partition pruning metadata. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
index 090fa0b097..d26ae19edd 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicate.java
@@ -41,9 +41,12 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.TimestampString;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
+import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.type.NativeType;
 import org.apache.ignite.internal.type.NativeTypeSpec;
 import org.jetbrains.annotations.Nullable;
@@ -55,34 +58,28 @@ public final class PartitionPruningPredicate {
 
     private static final ZoneId ZONE_ID_UTC = ZoneId.of("UTC");
 
-    private final int tablePartitions;
-
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-21543 Remove after 
is resolved,
-    //  remaining partitions should always be not null.
-    private final @Nullable IntSet remainingPartitions;
-
-    /**
-     * Constructor.
-     *
-     * @param table Table.
-     * @param pruningColumns Columns.
-     * @param dynamicParameters Values of dynamic parameters.
-     */
-    public PartitionPruningPredicate(IgniteTable table, 
PartitionPruningColumns pruningColumns, Object[] dynamicParameters) {
-        this.tablePartitions = table.partitions();
-        this.remainingPartitions = computeRemainingPartitions(table, 
pruningColumns, dynamicParameters);
+    private PartitionPruningPredicate() {
     }
 
     /**
      * Applies partition pruning to the given colocation group. This group 
should have the same number of assignments as the source table.
      *
+     * @param table Table.
+     * @param pruningColumns Partition pruning metadata.
+     * @param dynamicParameters Values dynamic parameters.
      * @param colocationGroup Colocation group.
      *
      * @return New colocation group.
      */
-    public ColocationGroup prunePartitions(ColocationGroup colocationGroup) {
-        assert tablePartitions == colocationGroup.assignments().size() : 
"Number of partitions does not match";
+    public static ColocationGroup prunePartitions(
+            IgniteTable table,
+            PartitionPruningColumns pruningColumns,
+            Object[] dynamicParameters,
+            ColocationGroup colocationGroup) {
 
+        assert table.partitions() == colocationGroup.assignments().size() : 
"Number of partitions does not match";
+
+        IntSet remainingPartitions = computeRemainingPartitions(table, 
pruningColumns, dynamicParameters);
         if (remainingPartitions == null) {
             return colocationGroup;
         }
@@ -121,6 +118,51 @@ public final class PartitionPruningPredicate {
         );
     }
 
+    /**
+     * Applies partition pruning to the list of given assignments and returns 
a list of partitions belonging to the given node.
+     *
+     * @param pruningColumns Partition pruning metadata.
+     * @param table Table.
+     * @param expressionFactory Expression factory.
+     * @param assignments Assignments.
+     * @param nodeName Node name.
+     *
+     * @return List of partitions that belong to the provided node.
+     */
+    public static <RowT> List<PartitionWithConsistencyToken> prunePartitions(
+            PartitionPruningColumns pruningColumns,
+            IgniteTable table,
+            ExpressionFactory<RowT> expressionFactory,
+            List<NodeWithConsistencyToken> assignments,
+            String nodeName
+    ) {
+        ImmutableIntList keys = table.distribution().getKeys();
+        PartitionCalculator partitionCalculator = 
table.partitionCalculator().get();
+        List<PartitionWithConsistencyToken> result = new ArrayList<>();
+
+        for (Int2ObjectMap<RexNode> columns : pruningColumns.columns()) {
+            for (int key : keys) {
+                RexNode node = columns.get(key);
+                ColumnDescriptor descriptor = 
table.descriptor().columnDescriptor(key);
+                NativeType physicalType = descriptor.physicalType();
+
+                Object valueInInternalForm = 
expressionFactory.execute(node).get();
+                Class<?> storageType = 
NativeTypeSpec.toClass(physicalType.spec(), descriptor.nullable());
+                Object value = TypeUtils.fromInternal(valueInInternalForm, 
storageType);
+
+                partitionCalculator.append(value);
+            }
+
+            int p = partitionCalculator.partition();
+            NodeWithConsistencyToken token = assignments.get(p);
+
+            if (nodeName.equals(token.name())) {
+                result.add(new PartitionWithConsistencyToken(p, 
token.enlistmentConsistencyToken()));
+            }
+        }
+
+        return result;
+    }
 
     private static @Nullable IntSet computeRemainingPartitions(
             IgniteTable table,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvidersTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvidersTest.java
new file mode 100644
index 0000000000..5ae3ff918c
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/PartitionProvidersTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningColumns;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/** Tests for {@link PartitionProvider} implementations. */
+@ExtendWith(MockitoExtension.class)
+public class PartitionProvidersTest extends BaseIgniteAbstractTest {
+
+    private static final long GROUP_ID = 1;
+
+    @Mock
+    private QueryTaskExecutor queryTaskExecutor;
+
+    @Test
+    public void testStaticPartitionProviderWithProvidedPartitions() {
+        List<NodeWithConsistencyToken> assignments = List.of(
+                new NodeWithConsistencyToken("n1", 0),
+                new NodeWithConsistencyToken("n2", 0),
+                new NodeWithConsistencyToken("n3", 0)
+        );
+
+        Map<String, List<PartitionWithConsistencyToken>> partitionsPerNode = 
Map.of(
+                "n1", List.of(new PartitionWithConsistencyToken(1, 1L)),
+                "n2", List.of(new PartitionWithConsistencyToken(2, 2L)),
+                "n3", List.of(new PartitionWithConsistencyToken(3, 3L))
+        );
+
+        IgniteTable table = TestBuilders.table()
+                .name("T1")
+                .addKeyColumn("C1", NativeTypes.INT32)
+                .partitions(42)
+                .distribution(IgniteDistributions.affinity(List.of(0), 2, "3"))
+                .partitions(assignments.size())
+                .build();
+
+        ColocationGroup group = new ColocationGroup(
+                List.of(1L, 2L), List.of("n1", "n2", "n3"),
+                assignments,
+                partitionsPerNode
+        );
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n1", group, null);
+            expectPartitions(ctx, table, List.of(
+                    new PartitionWithConsistencyToken(1, 1L)
+            ));
+        }
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n2", group, null);
+            expectPartitions(ctx, table, List.of(
+                    new PartitionWithConsistencyToken(2, 2L)
+            ));
+        }
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n3", group, null);
+            expectPartitions(ctx, table, List.of(
+                    new PartitionWithConsistencyToken(3, 3L)
+            ));
+        }
+    }
+
+    @Test
+    public void 
testStaticPartitionProviderUsesAssignmentsWhenNoPartitionsPresent() {
+        List<NodeWithConsistencyToken> assignments = List.of(
+                new NodeWithConsistencyToken("n1", 0),
+                new NodeWithConsistencyToken("n2", 1),
+                new NodeWithConsistencyToken("n1", 2),
+                new NodeWithConsistencyToken("n1", 3),
+                new NodeWithConsistencyToken("n3", 4)
+        );
+
+        IgniteTable table = TestBuilders.table()
+                .name("T1")
+                .addKeyColumn("C1", NativeTypes.INT32)
+                .partitions(assignments.size())
+                .distribution(IgniteDistributions.affinity(List.of(0), 2, "3"))
+                .partitions(assignments.size())
+                .build();
+
+        ColocationGroup group = new ColocationGroup(
+                List.of(1L, 2L), List.of("n1", "n2", "n3"),
+                assignments
+        );
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n1", group, null);
+            expectPartitions(ctx, table,
+                    List.of(
+                            new PartitionWithConsistencyToken(0, 0L),
+                            new PartitionWithConsistencyToken(2, 2L),
+                            new PartitionWithConsistencyToken(3, 3L)
+                    )
+            );
+        }
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n2", group, null);
+            expectPartitions(ctx, table, List.of(new 
PartitionWithConsistencyToken(1, 1L)));
+        }
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n3", group, null);
+            expectPartitions(ctx, table, List.of(new 
PartitionWithConsistencyToken(4, 4L)));
+        }
+    }
+
+    @Test
+    public void testDynamicPartitionProvider() {
+        List<NodeWithConsistencyToken> assignments = List.of(
+                new NodeWithConsistencyToken("n1", 0),
+                new NodeWithConsistencyToken("n2", 1),
+                new NodeWithConsistencyToken("n1", 2),
+                new NodeWithConsistencyToken("n1", 3),
+                new NodeWithConsistencyToken("n3", 4)
+        );
+
+        IgniteTable table = TestBuilders.table()
+                .name("T1")
+                .addKeyColumn("C1", NativeTypes.INT32)
+                .partitions(assignments.size())
+                .distribution(IgniteDistributions.affinity(List.of(0), 2, "3"))
+                .partitions(assignments.size())
+                .build();
+
+        ColocationGroup group = new ColocationGroup(
+                List.of(1L, 2L), List.of("n1", "n2", "n3"),
+                assignments,
+                Map.of()
+        );
+
+        PartitionPruningMetadata metadata = newMetadata(1);
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n1", group, metadata);
+            expectPartitions(ctx, table, List.of());
+        }
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n2", group, metadata);
+            expectPartitions(ctx, table, List.of(new 
PartitionWithConsistencyToken(1, 1L)));
+        }
+
+        {
+            ExecutionContext<Object[]> ctx = newContext("n3", group, metadata);
+            expectPartitions(ctx, table, List.of());
+        }
+    }
+
+    private ExecutionContext<Object[]> newContext(
+            String nodeName,
+            ColocationGroup colocationGroup,
+            @Nullable PartitionPruningMetadata metadata
+    ) {
+        ClusterNodeImpl node = new ClusterNodeImpl(nodeName, nodeName, new 
NetworkAddress("localhost", 1234));
+
+        Long2ObjectMap<ColocationGroup> map = new Long2ObjectOpenHashMap<>();
+        map.put(GROUP_ID, colocationGroup);
+
+        return TestBuilders.executionContext()
+                .queryId(UUID.randomUUID())
+                .executor(queryTaskExecutor)
+                .fragment(new FragmentDescription(1, false, map, null, null, 
metadata))
+                .localNode(node)
+                .build();
+    }
+
+    private static PartitionPruningMetadata newMetadata(long sourceId) {
+        RexNode expr = Commons.rexBuilder().makeLiteral(1, 
Commons.typeFactory().createSqlType(SqlTypeName.INTEGER));
+        PartitionPruningColumns columns = new 
PartitionPruningColumns(List.of(Int2ObjectMaps.singleton(0, expr)));
+
+        return new 
PartitionPruningMetadata(Long2ObjectMaps.singleton(sourceId, columns));
+    }
+
+    private static void expectPartitions(
+            ExecutionContext<Object[]> ctx,
+            IgniteTable table,
+            List<PartitionWithConsistencyToken> expected
+    ) {
+        ColocationGroup group = ctx.group(GROUP_ID);
+        assertNotNull(group, "no group does not exist: " + GROUP_ID);
+
+        PartitionProvider<Object[]> partitionProvider = 
ctx.getPartitionProvider(1, group, table);
+
+        List<PartitionWithConsistencyToken> actual = 
partitionProvider.getPartitions(ctx);
+        assertEquals(expected, actual, "Node: " + ctx.localNode().name());
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
index de08c93b80..e51b16ed61 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
@@ -94,7 +94,7 @@ public class ExpressionFactoryImplTest extends 
BaseIgniteAbstractTest {
         typeFactory = Commons.typeFactory();
 
         FragmentDescription fragmentDescription = new FragmentDescription(1, 
true,
-                Long2ObjectMaps.emptyMap(), null, null);
+                Long2ObjectMaps.emptyMap(), null, null, null);
 
         ExecutionContext<Object[]> ctx = TestBuilders.executionContext()
                 .queryId(UUID.randomUUID())
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 3f2d891a12..569f2864ff 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -209,6 +209,10 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
         addTableIdent("NT", "N1");
         addTableIdent("NT", "N2");
 
+        addTable("T1", "N0", "N1", "N2");
+        addTable("T2", "N0", "N1", "N2");
+        addTable("T3", "N0", "N1", "N2");
+
         testRunner.runTest(this::initSchema, "correlated.test");
     }
 
@@ -229,7 +233,7 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
 
     @Test
     public void testPartitionPruning() {
-        addNodes("N1", "N2", "N3", "N4", "N5");
+        addNodes("N0", "N1", "N2", "N3", "N4", "N5");
 
         addTable("T1", "N1", "N2", "N3");
         addTable("T2", "N4", "N5");
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
index 25116f531c..4577a1eaca 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
@@ -26,12 +26,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
 import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
+import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -196,6 +199,25 @@ final class FragmentPrinter extends IgniteRelShuttle {
             output.writeNewline();
         }
 
+        PartitionPruningMetadata pruningMetadata = 
mappedFragment.partitionPruningMetadata();
+        if (pruningMetadata != null) {
+            output.appendPadding();
+            output.writeKeyValue("pruningMetadata", 
pruningMetadata.data().long2ObjectEntrySet()
+                    .stream()
+                    .map(e -> {
+                        List<Map<Integer, RexNode>> columns = 
e.getValue().columns().stream()
+                                .map(TreeMap::new)
+                                .collect(Collectors.toList());
+
+                        return Map.entry(e.getLongKey(), columns);
+                    })
+                    .sorted(Entry.comparingByKey())
+                    .collect(Collectors.toList())
+                    .toString()
+            );
+            output.writeNewline();
+        }
+
         output.appendPadding();
         output.writeString("tree:");
         output.writeNewline();
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 2fae8e9bfb..804a67a9b1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -123,7 +123,7 @@ public abstract class AbstractExecutionTest<T> extends 
IgniteAbstractTest {
     }
 
     protected FragmentDescription getFragmentDescription() {
-        return new FragmentDescription(0, true, Long2ObjectMaps.emptyMap(), 
null, null);
+        return new FragmentDescription(0, true, Long2ObjectMaps.emptyMap(), 
null, null, null);
     }
 
     protected Object[] row(Object... fields) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 36180890e1..a800b03760 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -499,7 +499,7 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest<Object[]> {
         ExecutionContext<Object[]> targetCtx = TestBuilders.executionContext()
                 .queryId(queryId)
                 .executor(taskExecutor)
-                .fragment(new FragmentDescription(TARGET_FRAGMENT_ID, true, 
Long2ObjectMaps.emptyMap(), null, null))
+                .fragment(new FragmentDescription(TARGET_FRAGMENT_ID, true, 
Long2ObjectMaps.emptyMap(), null, null, null))
                 .localNode(localNode)
                 .build();
 
@@ -543,7 +543,7 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest<Object[]> {
         ExecutionContext<Object[]> sourceCtx = TestBuilders.executionContext()
                 .queryId(queryId)
                 .executor(taskExecutor)
-                .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true, 
Long2ObjectMaps.emptyMap(), null, null))
+                .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true, 
Long2ObjectMaps.emptyMap(), null, null, null))
                 .localNode(localNode)
                 .build();
 
@@ -575,7 +575,7 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest<Object[]> {
         ExecutionContext<Object[]> sourceCtx = TestBuilders.executionContext()
                 .queryId(queryId)
                 .executor(taskExecutor)
-                .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true, 
Long2ObjectMaps.emptyMap(), null, null))
+                .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, true, 
Long2ObjectMaps.emptyMap(), null, null, null))
                 .localNode(localNode)
                 .build();
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index f980bfceb1..e86e62369f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -208,8 +209,9 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
         RowFactory<Object[]> rowFactory = ctx.rowHandler().factory(rowSchema);
         SingleRangeIterable<Object[]> conditions = new 
SingleRangeIterable<>(new Object[]{}, null, false, false);
         List<PartitionWithConsistencyToken> partitions = 
scannableTable.getPartitions();
+        PartitionProvider<Object[]> partitionProvider = 
PartitionProvider.fromPartitions(partitions);
 
-        return new IndexScanNode<>(ctx, rowFactory, indexDescriptor, 
scannableTable, tableDescriptor, partitions,
+        return new IndexScanNode<>(ctx, rowFactory, indexDescriptor, 
scannableTable, tableDescriptor, partitionProvider,
                 comparator, conditions, null, null, null);
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
index 001e508f01..a81535c525 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
@@ -285,6 +285,6 @@ public class ModifyNodeExecutionTest extends 
AbstractExecutionTest<RowWrapper> {
     @Override
     protected FragmentDescription getFragmentDescription() {
         ColocationGroup colocationGroup = new ColocationGroup(List.of(), 
List.of(), List.of());
-        return new FragmentDescription(0, true, 
Long2ObjectMaps.singleton(SOURCE_ID, colocationGroup), null, null);
+        return new FragmentDescription(0, true, 
Long2ObjectMaps.singleton(SOURCE_ID, colocationGroup), null, null, null);
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 6d77d603a9..b24e441924 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -185,8 +186,9 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                 }
             };
             ScannableTableImpl scanableTable = new 
ScannableTableImpl(internalTable, rf -> rowConverter);
+            PartitionProvider<Object[]> partitionProvider = 
PartitionProvider.fromPartitions(partsWithConsistencyTokens);
             TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, scanableTable,
-                    partsWithConsistencyTokens, null, null, null);
+                    partitionProvider, null, null, null);
 
             RootNode<Object[]> root = new RootNode<>(ctx);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 040f70f24e..6fc2dafefd 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -101,6 +101,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
@@ -113,6 +114,7 @@ import org.apache.ignite.internal.type.NativeTypeSpec;
 import org.apache.ignite.internal.type.NumberNativeType;
 import org.apache.ignite.internal.type.TemporalNativeType;
 import org.apache.ignite.internal.type.VarlenNativeType;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.SubscriptionUtils;
 import org.apache.ignite.internal.util.TransformingIterator;
 import org.apache.ignite.internal.util.subscription.TransformingPublisher;
@@ -450,6 +452,9 @@ public class TestBuilders {
         /** Sets the node this fragment will be executed on. */
         ExecutionContextBuilder localNode(ClusterNode node);
 
+        /** Sets the dynamic parameters this fragment will be executed with. */
+        ExecutionContextBuilder dynamicParameters(Object... params);
+
         /**
          * Builds the context object.
          *
@@ -459,11 +464,12 @@ public class TestBuilders {
     }
 
     private static class ExecutionContextBuilderImpl implements 
ExecutionContextBuilder {
-        private FragmentDescription description = new FragmentDescription(0, 
true, null, null, null);
+        private FragmentDescription description = new FragmentDescription(0, 
true, null, null, null, null);
 
         private UUID queryId = null;
         private QueryTaskExecutor executor = null;
         private ClusterNode node = null;
+        private Object[] dynamicParams = ArrayUtils.OBJECT_EMPTY_ARRAY;
 
         /** {@inheritDoc} */
         @Override
@@ -497,6 +503,12 @@ public class TestBuilders {
             return this;
         }
 
+        @Override
+        public ExecutionContextBuilder dynamicParameters(Object... params) {
+            this.dynamicParams = params;
+            return this;
+        }
+
         /** {@inheritDoc} */
         @Override
         public ExecutionContext<Object[]> build() {
@@ -507,7 +519,7 @@ public class TestBuilders {
                     node.name(),
                     description,
                     ArrayRowHandler.INSTANCE,
-                    Map.of(),
+                    Commons.parametersMap(dynamicParams),
                     TxAttributes.fromTx(new NoOpTransaction(node.name())),
                     SqlQueryProcessor.DEFAULT_TIME_ZONE_ID
             );
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
index 945fa74465..192a31e80d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PartitionPruningTest.java
@@ -190,6 +190,32 @@ public class PartitionPruningTest extends 
AbstractPlannerTest {
         assertEquals(SqlTypeName.VARCHAR, lit2.getType().getSqlTypeName());
     }
 
+    @Test
+    public void testCorrelatedQuery() throws Exception {
+        IgniteTable table1 = TestBuilders.table()
+                .name("T1")
+                .addKeyColumn("C1", NativeTypes.INT32)
+                .addColumn("C2", NativeTypes.INT32, false)
+                .distribution(IgniteDistributions.affinity(List.of(0), 1, 2))
+                .build();
+
+        IgniteTable table2 = TestBuilders.table()
+                .name("T2")
+                .addKeyColumn("C1", NativeTypes.INT32)
+                .addColumn("C2", NativeTypes.INT32)
+                .distribution(IgniteDistributions.affinity(List.of(0), 1, 2))
+                .build();
+
+        PartitionPruningMetadataExtractor extractor = new 
PartitionPruningMetadataExtractor();
+
+        PartitionPruningMetadata actual = extractMetadata(extractor,
+                "SELECT * FROM t1 as cor WHERE EXISTS (SELECT 1 FROM t2 WHERE 
t2.c1 = cor.c1 OR t2.c1=42)", table1, table2);
+
+        PartitionPruningColumns cols = actual.get(2);
+        assertNotNull(cols, "No metadata for source=2");
+        assertEquals("[[0=$cor0.C1], [0=42]]", 
PartitionPruningColumns.canonicalForm(cols).toString());
+    }
+
     private PartitionPruningMetadata extractMetadata(String query, 
IgniteTable... table) throws Exception {
         PartitionPruningMetadataExtractor extractor = new 
PartitionPruningMetadataExtractor();
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
index bd2b15945d..ca29b0d4ef 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningPredicateSelfTest.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -42,8 +43,12 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.DateString;
 import org.apache.calcite.util.TimeString;
 import org.apache.calcite.util.TimestampString;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
@@ -60,11 +65,13 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.type.NativeType;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.sql.ColumnType;
 import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
 
 /**
  * Tests for {@link PartitionPruningPredicate}.
@@ -100,7 +107,7 @@ public class PartitionPruningPredicateSelfTest extends 
BaseIgniteAbstractTest {
 
         IgniteDistribution distribution = 
IgniteDistributions.affinity(List.of(0), 1, 1);
 
-        NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 2, 
2, 2);
+        NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 4, 
2, 4);
 
         IgniteTable table = TestBuilders.table()
                 .name("T")
@@ -115,13 +122,12 @@ public class PartitionPruningPredicateSelfTest extends 
BaseIgniteAbstractTest {
         RexNode expr = generateLiteralOrValueExpr(columnType, val);
 
         PartitionPruningColumns columns = new 
PartitionPruningColumns(List.of(Int2ObjectMaps.singleton(fieldIndex, expr)));
-        PartitionPruningPredicate predicate = new 
PartitionPruningPredicate(table, columns, new Object[0]);
 
         List<String> nodeNames = List.of("n1", "n2", "n3");
         List<NodeWithConsistencyToken> assignments = randomAssignments(table, 
nodeNames);
         ColocationGroup group = new ColocationGroup(List.of(0L), nodeNames, 
assignments);
 
-        expectPartitionsPruned(table, predicate, group, val);
+        expectPartitionsPruned(table, columns, new Object[0], group, val);
     }
 
     @ParameterizedTest
@@ -129,7 +135,7 @@ public class PartitionPruningPredicateSelfTest extends 
BaseIgniteAbstractTest {
     public void testDynamicParam(ColumnType columnType) {
         IgniteDistribution distribution = 
IgniteDistributions.affinity(List.of(0), 1, 1);
 
-        NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 2, 
2, 2);
+        NativeType nativeType = TypeUtils.columnType2NativeType(columnType, 4, 
2, 4);
 
         IgniteTable table = TestBuilders.table()
                 .name("T")
@@ -145,18 +151,18 @@ public class PartitionPruningPredicateSelfTest extends 
BaseIgniteAbstractTest {
         Object[] dynamicParameters = {val};
 
         PartitionPruningColumns columns = new 
PartitionPruningColumns(List.of(Int2ObjectMaps.singleton(fieldIndex, expr)));
-        PartitionPruningPredicate predicate = new 
PartitionPruningPredicate(table, columns, dynamicParameters);
 
         List<String> nodeNames = List.of("n1", "n2", "n3");
         List<NodeWithConsistencyToken> assignments = randomAssignments(table, 
nodeNames);
         ColocationGroup group = new ColocationGroup(List.of(0L), nodeNames, 
assignments);
 
-        expectPartitionsPruned(table, predicate, group, val);
+        expectPartitionsPruned(table, columns, dynamicParameters, group, val);
     }
 
     private static void expectPartitionsPruned(
             IgniteTable table,
-            PartitionPruningPredicate predicate,
+            PartitionPruningColumns pruningColumns,
+            Object[] dynamicParameters,
             ColocationGroup group,
             Object... values
     ) {
@@ -166,7 +172,7 @@ public class PartitionPruningPredicateSelfTest extends 
BaseIgniteAbstractTest {
         PartitionWithConsistencyToken expectedPartition = 
computeExpectedPartition(table, group.assignments(), values);
 
         // Apply partition pruning to obtain new colocation group.
-        ColocationGroup newGroup = predicate.prunePartitions(group);
+        ColocationGroup newGroup = 
PartitionPruningPredicate.prunePartitions(table, pruningColumns, 
dynamicParameters, group);
 
         String expectedNode = 
assignments.get(expectedPartition.partId()).name();
 
@@ -184,7 +190,28 @@ public class PartitionPruningPredicateSelfTest extends 
BaseIgniteAbstractTest {
             actual.put(nodeName, actualPartitions);
         }
 
-        assertEquals(expected, actual, "partitions per node");
+        assertEquals(expected, actual, "partitions per node (static)");
+
+        // ensure both implementations of prunePartitions produce the same 
result.
+
+        Map<String, List<PartitionWithConsistencyToken>> dynamicActual = new 
HashMap<>();
+
+        for (String nodeName : group.nodeNames()) {
+            ExecutionContext<Object[]> ctx = TestBuilders.executionContext()
+                    .queryId(UUID.randomUUID())
+                    .localNode(new ClusterNodeImpl(nodeName, nodeName, new 
NetworkAddress("localhost", 123)))
+                    .executor(Mockito.mock(QueryTaskExecutor.class))
+                    .dynamicParameters(dynamicParameters)
+                    .build();
+            ExpressionFactory<Object[]> expressionFactory = 
ctx.expressionFactory();
+
+            List<PartitionWithConsistencyToken> result = 
PartitionPruningPredicate.prunePartitions(
+                    pruningColumns, table, expressionFactory, assignments, 
nodeName
+            );
+            dynamicActual.put(nodeName, result);
+        }
+
+        assertEquals(expected, dynamicActual, "partitions per node (dynamic)");
     }
 
     private static PartitionWithConsistencyToken computeExpectedPartition(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PruningMetadataSerializationTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PruningMetadataSerializationTest.java
new file mode 100644
index 0000000000..2033f6dbe9
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PruningMetadataSerializationTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare.pruning;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import 
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests serialization of {@link PartitionPruningMetadata}.
+ */
+public class PruningMetadataSerializationTest extends BaseIgniteAbstractTest {
+
+    private final ClassDescriptorRegistry descriptorRegistry = new 
ClassDescriptorRegistry();
+
+    private final ClassDescriptorFactory descriptorFactory = new 
ClassDescriptorFactory(descriptorRegistry);
+
+    private final DefaultUserObjectMarshaller marshaller = new 
DefaultUserObjectMarshaller(descriptorRegistry, descriptorFactory);
+
+    @Test
+    public void test() throws Exception {
+
+
+        RexBuilder rexBuilder = Commons.rexBuilder();
+        RexNode lit1 = rexBuilder.makeLiteral("1");
+        RexNode lit2 = rexBuilder.makeLiteral(true);
+
+
+        Long2ObjectMap<PartitionPruningColumns> map = new 
Long2ObjectOpenHashMap<>();
+
+        map.put(1, new PartitionPruningColumns(List.of()));
+        map.put(2, new PartitionPruningColumns(List.of(
+                Int2ObjectMaps.singleton(0, lit1)
+        )));
+
+        Int2ObjectMap<RexNode> cols = new Int2ObjectOpenHashMap<>();
+        cols.put(0, lit2);
+        cols.put(1, lit1);
+        map.put(3, new PartitionPruningColumns(List.of(cols)));
+
+        PartitionPruningMetadata metadata = new PartitionPruningMetadata(map);
+
+        byte[] bytes = marshaller.marshal(metadata).bytes();
+
+        PartitionPruningMetadata fromBytes = marshaller.unmarshal(bytes, 
descriptorRegistry);
+        assertNotNull(fromBytes, "Deserialized to null");
+
+        expectColumns(fromBytes, 1, List.of());
+        expectColumns(fromBytes, 2, List.of(Map.of(0, lit1)));
+        expectColumns(fromBytes, 3, List.of(Map.of(0, lit2, 1, lit1)));
+    }
+
+    private static void expectColumns(
+            PartitionPruningMetadata metadata,
+            long sourceId,
+            List<Map<Integer, RexNode>> cols
+    ) {
+        PartitionPruningColumns columns = metadata.get(sourceId);
+        assertNotNull(columns, format("No metadata for source#{}: {}", 
sourceId, metadata));
+
+        assertEquals(cols, columns.columns(), format("Metadata for source#{} 
does not match", sourceId));
+    }
+}
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test 
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 584c9f903b..09750faf36 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -199,3 +199,138 @@ Fragment#1
     Sender(targetFragment=0, exchange=1, distribution=single)
       TableScan(name=PUBLIC.T_N1, source=3, partitions=1, 
distribution=affinity[table: T_N1, columns: [ID]])
 ---
+# Pass partition pruning metadata for correlated joins.
+N0
+SELECT * FROM t1_n0n1n2 as cor WHERE EXISTS (SELECT 1 FROM t2_n0n1n2 as t2 
WHERE t2.id = cor.id)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1, 2]
+  exchangeSourceNodes: {1=[N0, N1, N2], 2=[N0, N1, N2]}
+  tree:
+    Project
+      CorrelatedNestedLoopJoin
+        Receiver(sourceFragment=1, exchange=1, distribution=single)
+        ColocatedHashAggregate
+          Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2 correlated
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T2_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [3=[{0=$cor0.ID}]]
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3, 
distribution=random)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T1_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N0N1N2, source=4, partitions=3, 
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+---
+# Pass partition pruning metadata to correlated joins.
+N0
+SELECT * FROM t3_n0n1n2 AS out
+WHERE EXISTS (SELECT * FROM t1_n0n1n2 as cor WHERE out.id = cor.id AND EXISTS 
(SELECT 1 FROM t2_n0n1n2 as t2 WHERE t2.id = cor.id))
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1, 2, 3]
+  exchangeSourceNodes: {1=[N0, N1, N2], 2=[N0, N1, N2], 3=[N0, N1, N2]}
+  tree:
+    Project
+      CorrelatedNestedLoopJoin
+        Receiver(sourceFragment=1, exchange=1, distribution=single)
+        ColocatedHashAggregate
+          Project
+            CorrelatedNestedLoopJoin
+              Receiver(sourceFragment=2, exchange=2, distribution=single)
+              ColocatedHashAggregate
+                Receiver(sourceFragment=3, exchange=3, distribution=single)
+
+Fragment#3 correlated
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T2_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [4=[{0=$cor1.ID}]]
+  tree:
+    Sender(targetFragment=0, exchange=3, distribution=single)
+      TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
+
+Fragment#2 correlated
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T1_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [5=[{0=$cor0.ID}]]
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3, 
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T3_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3, 
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+---
+# Pass partition pruning metadata to correlated joins one layer deep.
+N0
+SELECT * FROM t3_n0n1n2 AS out
+WHERE EXISTS (
+  SELECT * FROM t1_n0n1n2 as cor
+  WHERE out.id = cor.id AND EXISTS (SELECT 1 FROM t2_n0n1n2 as t2 WHERE t2.id 
= out.id or t2.id=cor.id)
+)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1, 2, 3]
+  exchangeSourceNodes: {1=[N0, N1, N2], 2=[N0, N1, N2], 3=[N0, N1, N2]}
+  tree:
+    Project
+      CorrelatedNestedLoopJoin
+        Receiver(sourceFragment=1, exchange=1, distribution=single)
+        ColocatedHashAggregate
+          Project
+            CorrelatedNestedLoopJoin
+              Receiver(sourceFragment=2, exchange=2, distribution=single)
+              ColocatedHashAggregate
+                Receiver(sourceFragment=3, exchange=3, distribution=single)
+
+Fragment#3 correlated
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T2_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [4=[{0=$cor0.ID}, {0=$cor2.ID}]]
+  tree:
+    Sender(targetFragment=0, exchange=3, distribution=single)
+      TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
+
+Fragment#2 correlated
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T1_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [5=[{0=$cor0.ID}]]
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3, 
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N0, N1, N2]
+  tables: [T3_N0N1N2]
+  partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3, 
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+---
diff --git 
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test 
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 9b8ce7d85f..1451a4d63b 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -108,3 +108,75 @@ Fragment#1
         Sort
           TableScan(name=PUBLIC.T1_N1N2N3, source=3, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
 ---
+# Correlated
+# Prune partitions from left arm statically, and pass meta to the right arm.
+# Same set of nodes.
+N0
+SELECT * FROM t1_n1n2n3 as cor WHERE cor.id = 42 and EXISTS (SELECT 1 FROM 
t3_n1n2n3 as t2 WHERE t2.id = cor.id)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1, 2]
+  exchangeSourceNodes: {1=[N3], 2=[N1, N2, N3]}
+  tree:
+    Project
+      CorrelatedNestedLoopJoin
+        Receiver(sourceFragment=1, exchange=1, distribution=single)
+        ColocatedHashAggregate
+          Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2 correlated
+  targetNodes: [N0]
+  executionNodes: [N1, N2, N3]
+  tables: [T3_N1N2N3]
+  partitions: {N1=[0:3], N2=[1:3], N3=[2:3]}
+  pruningMetadata: [3=[{0=$cor0.ID}]]
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3, 
distribution=random)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N3]
+  tables: [T1_N1N2N3]
+  partitions: {N3=[2:3]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+---
+# Correlated.
+# Prune partitions from left arm statically, and pass meta to the right arm.
+# Different sets of nodes.
+N0
+SELECT * FROM t1_n1n2n3 as cor WHERE cor.id = 42 and EXISTS (SELECT 1 FROM 
t2_n4n5 as t2 WHERE t2.id = cor.id)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1, 2]
+  exchangeSourceNodes: {1=[N3], 2=[N4, N5]}
+  tree:
+    Project
+      CorrelatedNestedLoopJoin
+        Receiver(sourceFragment=1, exchange=1, distribution=single)
+        ColocatedHashAggregate
+          Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2 correlated
+  targetNodes: [N0]
+  executionNodes: [N4, N5]
+  tables: [T2_N4N5]
+  partitions: {N4=[0:2], N5=[1:2]}
+  pruningMetadata: [3=[{0=$cor0.ID}]]
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2, 
distribution=random)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N3]
+  tables: [T1_N1N2N3]
+  partitions: {N3=[2:3]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+---

Reply via email to