This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 732cfddb0e IGNITE-22466 Sql. Support mapping to non-primary replicas 
(#3972)
732cfddb0e is described below

commit 732cfddb0eaae071e7af44aa147d28cfe551de7f
Author: korlov42 <[email protected]>
AuthorDate: Tue Jun 25 17:06:27 2024 +0300

    IGNITE-22466 Sql. Support mapping to non-primary replicas (#3972)
---
 modules/sql-engine/build.gradle                    |   1 +
 .../sql/engine/ExecutionTargetProviderImpl.java    | 173 +++++++++++
 .../internal/sql/engine/SqlQueryProcessor.java     |  94 +-----
 .../sql/engine/exec/ExecutionServiceImpl.java      |   3 +-
 .../sql/engine/exec/mapping/ExecutionTarget.java   |  10 +
 .../exec/mapping/ExecutionTargetFactory.java       |   5 +-
 .../exec/mapping/ExecutionTargetProvider.java      |  10 +-
 .../sql/engine/exec/mapping/FragmentMapper.java    | 317 ++++++++++++++++++++-
 .../sql/engine/exec/mapping/FragmentMapping.java   |  11 +-
 .../sql/engine/exec/mapping/FragmentSplitter.java  |   2 +-
 .../sql/engine/exec/mapping/MappingParameters.java |  20 +-
 .../engine/exec/mapping/MappingServiceImpl.java    | 217 ++++++--------
 .../exec/mapping/smallcluster/AllOfTarget.java     |   5 +
 .../exec/mapping/smallcluster/OneOfTarget.java     |  15 +
 .../mapping/smallcluster/PartitionedTarget.java    |  51 ++++
 .../mapping/smallcluster/SmallClusterFactory.java  |  29 +-
 .../exec/mapping/smallcluster/SomeOfTarget.java    |  15 +
 .../internal/sql/engine/prepare/Fragment.java      |  10 +
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  50 ++--
 .../engine/exec/mapping/FragmentMappingTest.java   |  97 +++++--
 .../exec/mapping/MappingServiceImplTest.java       |  31 +-
 .../sql/engine/exec/mapping/MappingTestRunner.java |  12 +-
 .../sql/engine/framework/TestBuilders.java         |  48 ++--
 .../src/test/resources/mapping/correlated.test     | 136 ++++-----
 .../src/test/resources/mapping/hash_join.test      |  31 +-
 .../src/test/resources/mapping/merge_join.test     |  66 +----
 .../src/test/resources/mapping/set_ops.test        |  18 +-
 .../src/test/resources/mapping/table_identity.test |  68 ++---
 .../resources/mapping/table_identity_single.test   |  60 ++--
 .../src/test/resources/mapping/table_single.test   |  80 +++---
 .../resources/mapping/test_backup_mapping.test     | 162 +++++++++++
 .../resources/mapping/test_partition_pruning.test  |  72 ++---
 .../src/test/resources/mapping/union.test          |  30 +-
 33 files changed, 1337 insertions(+), 612 deletions(-)

diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 0d7dab1fbd..8f0804e08c 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -50,6 +50,7 @@ dependencies {
     implementation project(':ignite-failure-handler')
     implementation project(':ignite-placement-driver-api')
     implementation project(':ignite-partition-replicator')
+    implementation project(':ignite-affinity')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.caffeine
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
new file mode 100644
index 0000000000..3992941bb3
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
+import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.sql.SqlException;
+
+/**
+ * Implementation of {@link ExecutionTargetProvider} which takes assignments 
from {@link PlacementDriver} and {@link SystemViewManager}.
+ */
+public class ExecutionTargetProviderImpl implements ExecutionTargetProvider {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionTargetProviderImpl.class);
+
+    private final PlacementDriver placementDriver;
+    private final SystemViewManager systemViewManager;
+
+    ExecutionTargetProviderImpl(
+            PlacementDriver placementDriver, SystemViewManager 
systemViewManager
+    ) {
+        this.placementDriver = placementDriver;
+        this.systemViewManager = systemViewManager;
+    }
+
+    @Override
+    public CompletableFuture<ExecutionTarget> forTable(
+            HybridTimestamp operationTime,
+            ExecutionTargetFactory factory,
+            IgniteTable table,
+            boolean includeBackups
+    ) {
+        return collectAssignments(table, operationTime, includeBackups)
+                .thenApply(factory::partitioned);
+    }
+
+    @Override
+    public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+        List<String> nodes = systemViewManager.owningNodes(view.name());
+
+        if (nullOrEmpty(nodes)) {
+            return failedFuture(
+                    new SqlException(Sql.MAPPING_ERR, format("The view with 
name '{}' could not be found on"
+                            + " any active nodes in the cluster", view.name()))
+            );
+        }
+
+        return completedFuture(
+                view.distribution() == IgniteDistributions.single()
+                        ? factory.oneOf(nodes)
+                        : factory.allOf(nodes)
+        );
+    }
+
+    // need to be refactored after TODO: 
https://issues.apache.org/jira/browse/IGNITE-20925
+    /** Get primary replicas. */
+    private CompletableFuture<List<TokenizedAssignments>> collectAssignments(
+            IgniteTable table, HybridTimestamp operationTime, boolean 
includeBackups
+    ) {
+        int partitions = table.partitions();
+
+        List<CompletableFuture<TokenizedAssignments>> result = new 
ArrayList<>(partitions);
+
+        // no need to wait all partitions after pruning was implemented.
+        for (int partId = 0; partId < partitions; ++partId) {
+            ReplicationGroupId partGroupId = new TablePartitionId(table.id(), 
partId);
+
+            CompletableFuture<TokenizedAssignments> partitionAssignment = 
includeBackups
+                    ? allReplicas(partGroupId, operationTime)
+                    : primaryReplica(partGroupId, operationTime);
+
+            result.add(partitionAssignment);
+        }
+
+        CompletableFuture<Void> all = 
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
+
+        return all.thenApply(v -> result.stream()
+                .map(CompletableFuture::join)
+                .collect(Collectors.toList())
+        );
+    }
+
+    private CompletableFuture<TokenizedAssignments> primaryReplica(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp operationTime
+    ) {
+        CompletableFuture<ReplicaMeta> f = placementDriver.awaitPrimaryReplica(
+                replicationGroupId,
+                operationTime,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
+        );
+
+        return f.handle((primaryReplica, e) -> {
+            if (e != null) {
+                LOG.debug("Failed to retrieve primary replica for partition 
{}", e, replicationGroupId);
+
+                throw withCause(IgniteInternalException::new, 
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
+                        + " [tablePartitionId=" + replicationGroupId + ']', e);
+            } else {
+                String holder = primaryReplica.getLeaseholder();
+
+                assert holder != null : "Unable to map query, nothing holds 
the lease";
+
+                return new 
TokenizedAssignmentsImpl(Set.of(Assignment.forPeer(holder)), 
primaryReplica.getStartTime().longValue());
+            }
+        });
+    }
+
+    private CompletableFuture<TokenizedAssignments> allReplicas(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp operationTime
+    ) {
+        CompletableFuture<TokenizedAssignments> f = 
placementDriver.getAssignments(
+                replicationGroupId,
+                operationTime
+        );
+
+        return f.thenCompose(assignments -> {
+            if (assignments == null) {
+                // assignments are not ready yet, let's fall back to primary 
replicas
+                return primaryReplica(replicationGroupId, operationTime);
+            }
+
+            return completedFuture(assignments);
+        });
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 8daf6897dd..477496eb63 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -19,16 +19,11 @@ package org.apache.ignite.internal.sql.engine;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
-import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
-import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
@@ -48,7 +43,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -57,17 +51,12 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.sql.ResultSetMetadataImpl;
 import org.apache.ignite.internal.sql.SqlCommon;
@@ -81,15 +70,11 @@ import 
org.apache.ignite.internal.sql.engine.exec.ExecutionService;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.KeyValueGetPlan;
@@ -102,14 +87,11 @@ import 
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPrunerImpl;
 import org.apache.ignite.internal.sql.engine.property.SqlProperties;
 import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
@@ -131,7 +113,6 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
@@ -144,9 +125,6 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Default time-zone ID. */
     public static final ZoneId DEFAULT_TIME_ZONE_ID = ZoneId.of("UTC");
 
-    /** The logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(SqlQueryProcessor.class);
-
     private static final int PARSED_RESULT_CACHE_SIZE = 10_000;
 
     /** Size of the table access cache. */
@@ -330,36 +308,13 @@ public class SqlQueryProcessor implements QueryProcessor {
                 view -> () -> systemViewManager.scanView(view.name())
         );
 
-        var executionTargetProvider = new ExecutionTargetProvider() {
-            @Override
-            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
-                return primaryReplicas(table)
-                        .thenApply(factory::partitioned);
-            }
-
-            @Override
-            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-                List<String> nodes = 
systemViewManager.owningNodes(view.name());
-
-                if (nullOrEmpty(nodes)) {
-                    return failedFuture(
-                            new SqlException(Sql.MAPPING_ERR, format("The view 
with name '{}' could not be found on"
-                                    + " any active nodes in the cluster", 
view.name()))
-                    );
-                }
-
-                return completedFuture(
-                        view.distribution() == IgniteDistributions.single()
-                                ? factory.oneOf(nodes)
-                                : factory.allOf(nodes)
-                );
-            }
-        };
+        var executionTargetProvider = new 
ExecutionTargetProviderImpl(placementDriver, systemViewManager);
 
         var partitionPruner = new PartitionPrunerImpl();
 
         var mappingService = new MappingServiceImpl(
                 nodeName,
+                clockService,
                 executionTargetProvider,
                 CACHE_FACTORY,
                 clusterCfg.planner().estimatedNumberOfQueries().value(),
@@ -397,51 +352,6 @@ public class SqlQueryProcessor implements QueryProcessor {
         return nullCompletedFuture();
     }
 
-    // need to be refactored after TODO: 
https://issues.apache.org/jira/browse/IGNITE-20925
-    /** Get primary replicas. */
-    private CompletableFuture<List<NodeWithConsistencyToken>> 
primaryReplicas(IgniteTable table) {
-        int partitions = table.partitions();
-
-        List<CompletableFuture<NodeWithConsistencyToken>> result = new 
ArrayList<>(partitions);
-
-        HybridTimestamp clockNow = clockService.now();
-
-        // no need to wait all partitions after pruning was implemented.
-        for (int partId = 0; partId < partitions; ++partId) {
-            int partitionId = partId;
-            ReplicationGroupId partGroupId = new TablePartitionId(table.id(), 
partitionId);
-
-            CompletableFuture<ReplicaMeta> f = 
placementDriver.awaitPrimaryReplica(
-                    partGroupId,
-                    clockNow,
-                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                    SECONDS
-            );
-
-            result.add(f.handle((primaryReplica, e) -> {
-                if (e != null) {
-                    LOG.debug("Failed to retrieve primary replica for 
partition {}", e, partitionId);
-
-                    throw withCause(IgniteInternalException::new, 
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
-                            + " [tablePartitionId=" + partGroupId + ']', e);
-                } else {
-                    String holder = primaryReplica.getLeaseholder();
-
-                    assert holder != null : "Unable to map query, nothing 
holds the lease";
-
-                    return new NodeWithConsistencyToken(holder, 
primaryReplica.getStartTime().longValue());
-                }
-            }));
-        }
-
-        CompletableFuture<Void> all = 
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
-
-        return all.thenApply(v -> result.stream()
-                .map(CompletableFuture::join)
-                .collect(Collectors.toList())
-        );
-    }
-
     /** {@inheritDoc} */
     @Override
     public synchronized CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index a87817d0dc..be4d7705f6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -930,7 +930,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         private AsyncCursor<InternalSqlRow> execute(InternalTransaction tx, 
MultiStepPlan multiStepPlan) {
             assert root != null;
 
-            MappingParameters mappingParameters = 
MappingParameters.create(ctx.parameters());
+            boolean mapOnBackups = tx.isReadOnly();
+            MappingParameters mappingParameters = 
MappingParameters.create(ctx.parameters(), mapOnBackups);
 
             mappingService.map(multiStepPlan, 
mappingParameters).whenCompleteAsync((mappedFragments, mappingErr) -> {
                 if (mappingErr != null) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
index fbc24eca92..47f629fc2c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
@@ -45,6 +45,16 @@ public interface ExecutionTarget {
      */
     ExecutionTarget colocateWith(ExecutionTarget other) throws 
ColocationMappingException;
 
+    /**
+     * Removes options from current target which are not colocated with other 
target.
+     * 
+     * <p>If target has several options, remove those are not presented in 
given target to improve colocation.
+     *
+     * @param other Target with which we need to colocate current target.
+     * @return Returns new target in case current has been adjusted, return 
{@code this} instance otherwise.
+     */
+    ExecutionTarget trimTo(ExecutionTarget other);
+
     /**
      * Finalises target by choosing exactly one node for targets with multiple 
options.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
index 08b216725b..e3a02547ef 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import java.util.List;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 
 /**
@@ -28,10 +29,10 @@ public interface ExecutionTargetFactory {
     /**
      * Creates target from list of primary partitions.
      *
-     * @param nodes List of partitions.
+     * @param assignments List of assignments.
      * @return An execution target.
      */
-    ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes);
+    ExecutionTarget partitioned(List<TokenizedAssignments> assignments);
 
     /**
      * Creates target from list of required nodes.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
index fa6f3a343c..d131664cf1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 
@@ -29,11 +30,18 @@ public interface ExecutionTargetProvider {
     /**
      * Returns an execution target for a given table.
      *
+     * @param operationTime Time of the operation to get consistent results 
among different calls.
      * @param factory A factory to create target for given table.
      * @param table A table to create execution target for.
+     * @param includeBackups Flags denotes whether to include non-primary 
replicas into target.
      * @return A future representing the result.
      */
-    CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory 
factory, IgniteTable table);
+    CompletableFuture<ExecutionTarget> forTable(
+            HybridTimestamp operationTime,
+            ExecutionTargetFactory factory,
+            IgniteTable table,
+            boolean includeBackups
+    );
 
     /**
      * Returns an execution target for a given view.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index d466d9801d..271a7055bb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -17,18 +17,31 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
 import it.unimi.dsi.fastutil.longs.LongSets;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -69,11 +82,15 @@ import 
org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
+import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * A mapper that traverse a fragment tree and calculates a mapping.
  */
 class FragmentMapper {
+    private static final int MAPPING_ATTEMPTS = 3;
 
     private final RelMetadataQuery mq;
 
@@ -89,13 +106,7 @@ class FragmentMapper {
         this.targets = targets;
     }
 
-    /**
-     * Computes mapping for the given fragment.
-     *
-     * @param fragment A fragment to compute mapping for.
-     * @return Fragment meta information.
-     */
-    public FragmentMapping map(Fragment fragment) throws 
FragmentMappingException {
+    private Mapping map(Fragment fragment) throws FragmentMappingException {
         Mapping mapping = fragment.root().accept(new MapperVisitor());
 
         if (fragment.single()) {
@@ -125,7 +136,187 @@ class FragmentMapper {
             }
         }
 
-        return new FragmentMapping(mapping.createColocationGroups());
+        mapping.validate();
+
+        return mapping;
+    }
+
+    /**
+     * Computes mapping for the given list of fragments.
+     *
+     * @param fragments Fragments to compute mapping for.
+     * @param idGenerator Identity generator is used to get id for a new 
fragments which were split from original one in case mapper
+     *         consider the fragment impossible to colocate.
+     * @return Fragment meta information.
+     */
+    public List<FragmentMapping> map(List<Fragment> fragments, IdGenerator 
idGenerator) {
+        Exception ex = null;
+        boolean lastAttemptSucceed = false;
+        Long2ObjectMap<Pair<Fragment, Mapping>> mappingByFragmentId = new 
Long2ObjectOpenHashMap<>();
+        for (int attempt = 0; attempt < MAPPING_ATTEMPTS && 
!lastAttemptSucceed; attempt++) {
+            Fragment currentFragment = null;
+            try {
+                for (Fragment fragment : fragments) {
+                    currentFragment = fragment;
+
+                    if 
(mappingByFragmentId.containsKey(fragment.fragmentId())) {
+                        continue;
+                    }
+
+                    mappingByFragmentId.put(fragment.fragmentId(), new 
Pair<>(fragment, map(fragment)));
+                }
+
+                lastAttemptSucceed = true;
+            } catch (FragmentMappingException mappingException) {
+                if (ex == null) {
+                    ex = mappingException;
+                } else {
+                    ex.addSuppressed(mappingException);
+                }
+
+                fragments = replace(
+                        fragments,
+                        currentFragment,
+                        new FragmentSplitter(idGenerator, 
mappingException.node()).go(currentFragment)
+                );
+            }
+        }
+
+        if (!lastAttemptSucceed) {
+            throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable to map 
query: " + ex.getMessage(), ex);
+        }
+
+        return adjustMapping(mappingByFragmentId).stream()
+                .map(pair -> new FragmentMapping(pair.getFirst(), 
pair.getSecond().createColocationGroups()))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Attempt to colocate different fragments with each other.
+     *
+     * <p>Current implementation assumes that intermediate results becomes 
smaller on every step, thus it prioritizes colocation of children
+     * over colocation of parents. This assumption is not always correct, but 
it will be addressed later by more sophisticated algorithm.
+     *
+     * @param mappingByFragmentId Fragments to map.
+     * @return List of pairs of fragments to theirs mapping metadata.
+     */
+    private static List<Pair<Fragment, Mapping>> adjustMapping(
+            Long2ObjectMap<Pair<Fragment, Mapping>> mappingByFragmentId
+    ) {
+        LongList fragmentIds = collectFragmentIds(mappingByFragmentId);
+
+        /*
+         Current implementation adjusts colocation in two passes. First pass 
is bottom-up, here we
+         start from leaf nodes and adjust execution targets pairwise up to the 
root. Second pass is
+         top-down, here we try to improve colocation of an entire plan with 
root fragment.
+         */
+
+        // first pass is bottom-up
+        // the very first fragment is root, which don't have any parents, so 
let's just skip it
+        for (int i = fragmentIds.size() - 1; i > 0; i--) {
+            Pair<Fragment, Mapping> currentPair = 
mappingByFragmentId.get(fragmentIds.getLong(i));
+
+            Mapping currentMapping = currentPair.getSecond();
+
+            Long targetFragmentId = currentPair.getFirst().targetFragmentId();
+
+            assert targetFragmentId != null;
+
+            Pair<Fragment, Mapping> parentPair = 
mappingByFragmentId.get((long) targetFragmentId);
+
+            Mapping parentMapping = parentPair.getSecond();
+
+            { // to reduce visibility scope of `newCurrentMapping`
+                Mapping newCurrentMapping = 
currentMapping.bestEffortColocate(parentMapping);
+                if (newCurrentMapping != null) {
+                    Fragment currentFragment = currentPair.getFirst();
+
+                    mappingByFragmentId.put(currentFragment.fragmentId(), new 
Pair<>(currentFragment, newCurrentMapping));
+                    currentMapping = newCurrentMapping;
+                }
+            }
+
+            { // to reduce visibility scope of `newParentMapping`
+                Mapping newParentMapping = 
parentMapping.bestEffortColocate(currentMapping);
+                if (newParentMapping != null) {
+                    Fragment parentFragment = parentPair.getFirst();
+
+                    mappingByFragmentId.put(parentFragment.fragmentId(), new 
Pair<>(parentFragment, newParentMapping));
+                }
+            }
+        }
+
+        // second pass is top-down
+        // root fragment has been already processed on previous pass, thus 
let's skip it
+        for (int i = 1; i < fragmentIds.size(); i++) {
+            Pair<Fragment, Mapping> currentPair = 
mappingByFragmentId.get(fragmentIds.getLong(i));
+
+            Mapping currentMapping = currentPair.getSecond();
+
+            for (IgniteReceiver receiver : currentPair.getFirst().remotes()) {
+                Pair<Fragment, Mapping> childPair = 
mappingByFragmentId.get(receiver.sourceFragmentId());
+
+                Mapping childMapping = childPair.getSecond();
+
+                { // to reduce visibility scope of `newCurrentMapping`
+                    Mapping newCurrentMapping = 
currentMapping.bestEffortColocate(childMapping);
+                    if (newCurrentMapping != null) {
+                        Fragment currentFragment = currentPair.getFirst();
+
+                        mappingByFragmentId.put(currentFragment.fragmentId(), 
new Pair<>(currentFragment, newCurrentMapping));
+                        currentMapping = newCurrentMapping;
+                    }
+                }
+
+                { // to reduce visibility scope of `newChildMapping`
+                    Mapping newChildMapping = 
childMapping.bestEffortColocate(currentMapping);
+                    if (newChildMapping != null) {
+                        Fragment childFragment = childPair.getFirst();
+
+                        mappingByFragmentId.put(childFragment.fragmentId(), 
new Pair<>(childFragment, newChildMapping));
+                    }
+                }
+            }
+        }
+
+        return fragmentIds.longStream()
+                .mapToObj(mappingByFragmentId::get)
+                .collect(Collectors.toList());
+    }
+
+    private static LongList collectFragmentIds(Long2ObjectMap<Pair<Fragment, 
Mapping>> mappingByFragmentId) {
+        LongList fragmentIds = new LongArrayList();
+        Queue<Fragment> toProcess = new LinkedList<>();
+
+        Fragment root = findRootFragment(mappingByFragmentId.values());
+
+        toProcess.add(root);
+
+        while (!toProcess.isEmpty()) {
+            Fragment current = toProcess.poll();
+
+            fragmentIds.add(current.fragmentId());
+
+            for (IgniteReceiver receiver : current.remotes()) {
+                
toProcess.add(mappingByFragmentId.get(receiver.sourceFragmentId()).getFirst());
+            }
+        }
+        return fragmentIds;
+    }
+
+    private static Fragment findRootFragment(Collection<Pair<Fragment, 
Mapping>> fragments) {
+        Fragment root = null;
+        for (Pair<Fragment, Mapping> pair : fragments) {
+            if (pair.getFirst().rootFragment()) {
+                assert root == null;
+
+                root = pair.getFirst();
+            }
+        }
+
+        assert root != null;
+
+        return root;
     }
 
     private class MapperVisitor implements IgniteRelVisitor<Mapping> {
@@ -339,6 +530,9 @@ class FragmentMapper {
                 RelOptCost leftVarCost = mq.getCumulativeCost(leftVar);
                 RelOptCost rightVarCost = mq.getCumulativeCost(rightVar);
 
+                assert leftVarCost != null;
+                assert rightVarCost != null;
+
                 if (leftVarCost.isLt(rightVarCost)) {
                     return new FailedMapping(new 
FragmentMappingException(e.getMessage(), left, e));
                 } else {
@@ -351,10 +545,9 @@ class FragmentMapper {
          * See {@link MapperVisitor#mapSingleRel(SingleRel)}
          *
          * <p>{@link ColocationMappingException} may be thrown on two children 
nodes locations merge. This means that the
-         * fragment (which part the parent node is) cannot be executed on any 
node and additional exchange is needed.
-         * This case we throw {@link FragmentMappingException} with an edge, 
where we need the additional exchange.
-         * After the exchange is put into the fragment and the fragment is 
split into two ones, fragment meta information
-         * will be recalculated for all fragments.
+         * fragment (which part the parent node is) cannot be executed on any 
node and additional exchange is needed. This case we throw
+         * {@link FragmentMappingException} with an edge, where we need the 
additional exchange. After the exchange is put into the fragment
+         * and the fragment is split into two ones, fragment meta information 
will be recalculated for all fragments.
          */
         private Mapping mapSetOp(SetOp rel) {
             if (TraitUtils.distribution(rel) == IgniteDistributions.random()) {
@@ -387,6 +580,8 @@ class FragmentMapper {
                     }
                 }
 
+                assert res != null : "SetOp without inputs";
+
                 return res;
             }
         }
@@ -455,7 +650,11 @@ class FragmentMapper {
 
         Mapping colocate(Mapping other) throws ColocationMappingException;
 
-        List<ColocationGroup> createColocationGroups() throws 
FragmentMappingException;
+        @Nullable Mapping bestEffortColocate(Mapping other);
+
+        void validate() throws FragmentMappingException;
+
+        List<ColocationGroup> createColocationGroups();
     }
 
     private static class FailedMapping implements Mapping {
@@ -481,9 +680,19 @@ class FragmentMapper {
         }
 
         @Override
-        public List<ColocationGroup> createColocationGroups() throws 
FragmentMappingException {
+        public @Nullable Mapping bestEffortColocate(Mapping other) {
+            return null;
+        }
+
+        @Override
+        public void validate() throws FragmentMappingException {
             throw exception;
         }
+
+        @Override
+        public List<ColocationGroup> createColocationGroups() {
+            throw new AssertionError("Should not be called");
+        }
     }
 
     private static class CombinedMapping implements Mapping {
@@ -509,7 +718,19 @@ class FragmentMapper {
         }
 
         @Override
-        public List<ColocationGroup> createColocationGroups() throws 
FragmentMappingException {
+        public @Nullable Mapping bestEffortColocate(Mapping other) {
+            return null;
+        }
+
+        @Override
+        public void validate() throws FragmentMappingException {
+            for (Mapping mapping : mappings) {
+                mapping.validate();
+            }
+        }
+
+        @Override
+        public List<ColocationGroup> createColocationGroups() {
             List<ColocationGroup> groups = new ArrayList<>();
 
             for (Mapping mapping : mappings) {
@@ -557,6 +778,30 @@ class FragmentMapper {
             return new ColocatedMapping(LongSets.unmodifiable(sourceIds), 
colocatedTarget);
         }
 
+        @Override
+        public @Nullable Mapping bestEffortColocate(Mapping other) {
+            if (!other.colocated()) {
+                return null;
+            }
+
+            assert other instanceof ColocatedMapping : 
other.getClass().getCanonicalName();
+
+            ColocatedMapping colocatedMapping = (ColocatedMapping) other;
+
+            ExecutionTarget newTarget = target.trimTo(colocatedMapping.target);
+
+            if (newTarget == target) {
+                return null;
+            }
+
+            return new ColocatedMapping(this.sourceIds, newTarget);
+        }
+
+        @Override
+        public void validate() {
+            // colocated mapping always valid
+        }
+
         @Override
         public List<ColocationGroup> createColocationGroups() {
             ExecutionTarget finalised = target.finalise();
@@ -573,4 +818,46 @@ class FragmentMapper {
             );
         }
     }
+
+    private static List<Fragment> replace(
+            List<Fragment> originalFragments,
+            Fragment fragmentToReplace,
+            List<Fragment> replacement
+    ) {
+        assert !nullOrEmpty(replacement);
+
+        Long2LongOpenHashMap newTargets = new Long2LongOpenHashMap();
+        for (Fragment fragment0 : replacement) {
+            for (IgniteReceiver remote : fragment0.remotes()) {
+                newTargets.put(remote.exchangeId(), fragment0.fragmentId());
+            }
+        }
+
+        List<Fragment> newFragments = new ArrayList<>(originalFragments.size() 
+ replacement.size() - 1);
+        for (Fragment fragment : originalFragments) {
+            if (fragment == fragmentToReplace) {
+                //noinspection AssignmentToForLoopParameter
+                fragment = first(replacement);
+            } else if (!fragment.rootFragment()) {
+                IgniteSender sender = (IgniteSender) fragment.root();
+
+                long newTargetId = 
newTargets.getOrDefault(sender.exchangeId(), Long.MIN_VALUE);
+
+                if (newTargetId != Long.MIN_VALUE) {
+                    sender = new IgniteSender(sender.getCluster(), 
sender.getTraitSet(),
+                            sender.getInput(), sender.exchangeId(), 
newTargetId, sender.distribution());
+
+                    //noinspection AssignmentToForLoopParameter
+                    fragment = new Fragment(fragment.fragmentId(), 
fragment.correlated(), sender,
+                            fragment.remotes(), fragment.tables(), 
fragment.systemViews());
+                }
+            }
+
+            newFragments.add(fragment);
+        }
+
+        newFragments.addAll(replacement.subList(1, replacement.size()));
+
+        return newFragments;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
index dba2b4967c..16e20ea5e5 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import java.util.List;
+import org.apache.ignite.internal.sql.engine.prepare.Fragment;
 
 /**
  * Intermediate result returned by {@link FragmentMapper}.
@@ -30,13 +31,19 @@ import java.util.List;
  * <p>That's why we need additional container here.
  */
 class FragmentMapping {
+    private final Fragment fragment;
     private final List<ColocationGroup> groups;
 
-    FragmentMapping(List<ColocationGroup> groups) {
+    FragmentMapping(Fragment fragment, List<ColocationGroup> groups) {
+        this.fragment = fragment;
         this.groups = groups;
     }
 
-    public List<ColocationGroup> groups() {
+    List<ColocationGroup> groups() {
         return groups;
     }
+
+    Fragment fragment() {
+        return fragment;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
index becf16d557..a8105ec9eb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
@@ -68,7 +68,7 @@ class FragmentSplitter extends IgniteRelShuttle {
     public List<Fragment> go(Fragment fragment) {
         ArrayList<Fragment> res = new ArrayList<>();
 
-        stack.push(new FragmentProto(idGenerator.nextId(), 
fragment.correlated(), fragment.root()));
+        stack.push(new FragmentProto(fragment.fragmentId(), 
fragment.correlated(), fragment.root()));
 
         while (!stack.isEmpty()) {
             curr = stack.pop();
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
index 014cd4add1..1f586c4541 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
@@ -23,31 +23,41 @@ import org.apache.ignite.internal.util.ArrayUtils;
 public class MappingParameters {
 
     /** Empty mapping parameters. */
-    public static final MappingParameters EMPTY = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY);
+    public static final MappingParameters EMPTY = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false);
 
+    private final boolean mapOnBackups;
     private final Object[] dynamicParameters;
 
     /**
      * Creates mapping parameters.
      *
      * @param dynamicParameters Dynamic parameters.
+     * @param mapOnBackups Whether to use non-primary replicas for mapping or 
not.
      *
      * @return Mapping parameters.
      */
-    public static MappingParameters create(Object[] dynamicParameters) {
+    public static MappingParameters create(Object[] dynamicParameters, boolean 
mapOnBackups) {
         if (dynamicParameters.length == 0) {
             return EMPTY;
         } else {
-            return new MappingParameters(dynamicParameters);
+            return new MappingParameters(dynamicParameters, mapOnBackups);
         }
     }
 
-    private MappingParameters(Object[] dynamicParameters) {
+    private MappingParameters(
+            Object[] dynamicParameters,
+            boolean mapOnBackups
+    ) {
         this.dynamicParameters = dynamicParameters;
+        this.mapOnBackups = mapOnBackups;
     }
 
     /** Values of dynamic parameters. */
-    public Object[] dynamicParameters() {
+    Object[] dynamicParameters() {
         return dynamicParameters;
     }
+
+    boolean mapOnBackups() {
+        return mapOnBackups;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 7c28ac249e..bcdf364850 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -18,20 +18,18 @@
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import static java.util.concurrent.CompletableFuture.allOf;
-import static org.apache.ignite.internal.util.CollectionUtils.first;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntObjectPair;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -41,7 +39,8 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl.LogicalTopologyHolder.TopologySnapshot;
@@ -55,7 +54,6 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
 import org.apache.ignite.internal.sql.engine.util.cache.Cache;
 import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.internal.util.CompletableFutures;
-import org.apache.ignite.lang.ErrorGroups.Sql;
 
 /**
  * An implementation of {@link MappingService}.
@@ -64,15 +62,14 @@ import org.apache.ignite.lang.ErrorGroups.Sql;
  * Always uses latest topology snapshot to map query.
  */
 public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventListener {
-    private static final int MAPPING_ATTEMPTS = 3;
-
     private final LogicalTopologyHolder topologyHolder = new 
LogicalTopologyHolder();
     private final CompletableFuture<Void> initialTopologyFuture = new 
CompletableFuture<>();
 
     private final String localNodeName;
+    private final ClockService clock;
     private final ExecutionTargetProvider targetProvider;
     private final Cache<PlanId, FragmentsTemplate> templatesCache;
-    private final Cache<PlanId, MappingsCacheValue> mappingsCache;
+    private final Cache<MappingsCacheKey, MappingsCacheValue> mappingsCache;
     private final Executor taskExecutor;
     private final PartitionPruner partitionPruner;
 
@@ -80,6 +77,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
      * Constructor.
      *
      * @param localNodeName Name of the current Ignite node.
+     * @param clock Clock service to get actual time.
      * @param targetProvider Execution target provider.
      * @param cacheFactory A factory to create cache of fragments.
      * @param cacheSize Size of the cache of query plans. Should be non 
negative.
@@ -88,6 +86,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
      */
     public MappingServiceImpl(
             String localNodeName,
+            ClockService clock,
             ExecutionTargetProvider targetProvider,
             CacheFactory cacheFactory,
             int cacheSize,
@@ -95,6 +94,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
             Executor taskExecutor
     ) {
         this.localNodeName = localNodeName;
+        this.clock = clock;
         this.targetProvider = targetProvider;
         this.templatesCache = cacheFactory.create(cacheSize);
         this.mappingsCache = cacheFactory.create(cacheSize);
@@ -130,41 +130,50 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
 
         FragmentsTemplate template = getOrCreateTemplate(multiStepPlan, 
context);
 
-        MappingsCacheValue cacheValue = 
mappingsCache.compute(multiStepPlan.id(), (key, val) -> {
-            if (val == null) {
-                IntSet tableIds = new IntOpenHashSet();
-                boolean topologyAware = false;
-
-                for (Fragment fragment : template.fragments) {
-                    topologyAware = topologyAware || 
!fragment.systemViews().isEmpty();
-                    for (IgniteDataSource source : fragment.tables().values()) 
{
-                        tableIds.add(source.id());
-                    }
-                }
+        boolean mapOnBackups = parameters.mapOnBackups();
+        MappingsCacheValue cacheValue = mappingsCache.compute(
+                new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
+                (key, val) -> {
+                    if (val == null) {
+                        IntSet tableIds = new IntOpenHashSet();
+                        boolean topologyAware = false;
+
+                        for (Fragment fragment : template.fragments) {
+                            topologyAware = topologyAware || 
!fragment.systemViews().isEmpty();
+                            for (IgniteDataSource source : 
fragment.tables().values()) {
+                                tableIds.add(source.id());
+                            }
+                        }
 
-                long topVer = topologyAware ? topology.version() : 
Long.MAX_VALUE;
+                        long topVer = topologyAware ? topology.version() : 
Long.MAX_VALUE;
 
-                return new MappingsCacheValue(topVer, tableIds, 
mapFragments(context, template));
-            }
+                        return new MappingsCacheValue(topVer, tableIds, 
mapFragments(context, template, key.mapOnBackups));
+                    }
 
-            if (val.topVer < topology.version()) {
-                return new MappingsCacheValue(topology.version(), 
val.tableIds, mapFragments(context, template));
-            }
+                    if (val.topVer < topology.version()) {
+                        return new MappingsCacheValue(topology.version(), 
val.tableIds, mapFragments(context, template, key.mapOnBackups));
+                    }
 
-            return val;
-        });
+                    return val;
+                }
+        );
 
         return cacheValue.mappedFragments.thenApply(mappedFragments -> 
applyPartitionPruning(mappedFragments.fragments, parameters));
     }
 
-    private CompletableFuture<MappedFragments> mapFragments(MappingContext 
context, FragmentsTemplate template) {
+    private CompletableFuture<MappedFragments> mapFragments(
+            MappingContext context,
+            FragmentsTemplate template,
+            boolean mapOnBackups
+    ) {
         IdGenerator idGenerator = new IdGenerator(template.nextId);
         List<Fragment> fragments = new ArrayList<>(template.fragments);
+        HybridTimestamp mappingTime = clock.now();
 
         List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
                 fragments.stream().flatMap(fragment -> Stream.concat(
                         fragment.tables().values().stream()
-                                .map(table -> 
targetProvider.forTable(context.targetFactory(), table)
+                                .map(table -> 
targetProvider.forTable(mappingTime, context.targetFactory(), table, 
mapOnBackups)
                                         .thenApply(target -> 
IntObjectPair.of(table.id(), target))
                                 ),
                         fragment.systemViews().stream()
@@ -187,67 +196,35 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
 
                     FragmentMapper mapper = new 
FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById);
 
-                    Long2ObjectMap<FragmentMapping> mappingByFragmentId = new 
Long2ObjectOpenHashMap<>();
+                    List<FragmentMapping> mappings = mapper.map(fragments, 
idGenerator);
+
                     Long2ObjectMap<ColocationGroup> groupsBySourceId = new 
Long2ObjectOpenHashMap<>();
                     Long2ObjectMap<List<String>> allSourcesByExchangeId = new 
Long2ObjectOpenHashMap<>();
-                    Exception ex = null;
-                    boolean lastAttemptSucceed = false;
-                    List<Fragment> fragmentsToMap = fragments;
-                    for (int attempt = 0; attempt < MAPPING_ATTEMPTS && 
!lastAttemptSucceed; attempt++) {
-                        Fragment currentFragment = null;
-                        try {
-                            for (Fragment fragment : fragmentsToMap) {
-                                currentFragment = fragment;
-
-                                if 
(mappingByFragmentId.containsKey(fragment.fragmentId())) {
-                                    continue;
-                                }
-
-                                FragmentMapping mapping = mapper.map(fragment);
-
-                                mappingByFragmentId.put(fragment.fragmentId(), 
mapping);
-                                for (ColocationGroup group : mapping.groups()) 
{
-                                    for (long sourceId : group.sourceIds()) {
-                                        groupsBySourceId.put(sourceId, group);
-                                    }
-                                }
-
-                                if (!fragment.rootFragment()) {
-                                    IgniteSender sender = (IgniteSender) 
fragment.root();
-
-                                    List<String> nodeNames = 
mapping.groups().stream()
-                                            .flatMap(g -> 
g.nodeNames().stream())
-                                            
.distinct().collect(Collectors.toList());
-
-                                    
allSourcesByExchangeId.put(sender.exchangeId(), nodeNames);
-                                }
 
-                            }
+                    for (FragmentMapping mapping : mappings) {
+                        Fragment fragment = mapping.fragment();
 
-                            lastAttemptSucceed = true;
-                        } catch (FragmentMappingException mappingException) {
-                            if (ex == null) {
-                                ex = mappingException;
-                            } else {
-                                ex.addSuppressed(mappingException);
+                        for (ColocationGroup group : mapping.groups()) {
+                            for (long sourceId : group.sourceIds()) {
+                                groupsBySourceId.put(sourceId, group);
                             }
-
-                            fragmentsToMap = replace(
-                                    fragmentsToMap,
-                                    currentFragment,
-                                    new FragmentSplitter(idGenerator, 
mappingException.node()).go(currentFragment)
-                            );
                         }
-                    }
 
-                    if (!lastAttemptSucceed) {
-                        throw new IgniteInternalException(Sql.MAPPING_ERR, 
"Unable to map query: " + ex.getMessage(), ex);
+                        if (!fragment.rootFragment()) {
+                            IgniteSender sender = (IgniteSender) 
fragment.root();
+
+                            List<String> nodeNames = mapping.groups().stream()
+                                    .flatMap(g -> g.nodeNames().stream())
+                                    .distinct().collect(Collectors.toList());
+
+                            allSourcesByExchangeId.put(sender.exchangeId(), 
nodeNames);
+                        }
                     }
 
-                    List<MappedFragment> mappedFragmentsList = new 
ArrayList<>(fragmentsToMap.size());
+                    List<MappedFragment> mappedFragmentsList = new 
ArrayList<>(mappings.size());
                     Set<String> targetNodes = new HashSet<>();
-                    for (Fragment fragment : fragmentsToMap) {
-                        FragmentMapping mapping = 
mappingByFragmentId.get(fragment.fragmentId());
+                    for (FragmentMapping mapping : mappings) {
+                        Fragment fragment = mapping.fragment();
 
                         ColocationGroup targetGroup = null;
                         if (!fragment.rootFragment()) {
@@ -293,14 +270,9 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
     public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
         topologyHolder.update(newTopology);
 
-        mappingsCache.removeIfValue(value -> {
-            if (value.mappedFragments.isDone()) {
-                return 
value.mappedFragments.join().nodes.contains(leftNode.name());
-            }
-
-            // Invalidate non-completed mappings to reduce the chance of 
getting stale value
-            return true;
-        });
+        mappingsCache.removeIfValue(value ->
+                !value.mappedFragments.isDone() // Invalidate non-completed 
mappings to reduce the chance of getting stale value 
+                        || 
value.mappedFragments.join().nodes.contains(leftNode.name()));
     }
 
     @Override
@@ -308,46 +280,6 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
         topologyHolder.update(newTopology);
     }
 
-    private static List<Fragment> replace(
-            List<Fragment> originalFragments,
-            Fragment fragmentToReplace,
-            List<Fragment> replacement
-    ) {
-        assert !nullOrEmpty(replacement);
-
-        Long2LongOpenHashMap newTargets = new Long2LongOpenHashMap();
-        for (Fragment fragment0 : replacement) {
-            for (IgniteReceiver remote : fragment0.remotes()) {
-                newTargets.put(remote.exchangeId(), fragment0.fragmentId());
-            }
-        }
-
-        List<Fragment> newFragments = new ArrayList<>(originalFragments.size() 
+ replacement.size() - 1);
-        for (Fragment fragment : originalFragments) {
-            if (fragment == fragmentToReplace) {
-                fragment = first(replacement);
-            } else if (!fragment.rootFragment()) {
-                IgniteSender sender = (IgniteSender) fragment.root();
-
-                long newTargetId = 
newTargets.getOrDefault(sender.exchangeId(), Long.MIN_VALUE);
-
-                if (newTargetId != Long.MIN_VALUE) {
-                    sender = new IgniteSender(sender.getCluster(), 
sender.getTraitSet(),
-                            sender.getInput(), sender.exchangeId(), 
newTargetId, sender.distribution());
-
-                    fragment = new Fragment(fragment.fragmentId(), 
fragment.correlated(), sender,
-                            fragment.remotes(), fragment.tables(), 
fragment.systemViews());
-                }
-            }
-
-            newFragments.add(fragment);
-        }
-
-        newFragments.addAll(replacement.subList(1, replacement.size()));
-
-        return newFragments;
-    }
-
     private List<MappedFragment> applyPartitionPruning(List<MappedFragment> 
mappedFragments, MappingParameters parameters) {
         return partitionPruner.apply(mappedFragments, 
parameters.dynamicParameters());
     }
@@ -448,4 +380,33 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
             this.mappedFragments = mappedFragments;
         }
     }
+
+    private static class MappingsCacheKey {
+        private final PlanId planId;
+        private final boolean mapOnBackups;
+
+        MappingsCacheKey(PlanId planId, boolean mapOnBackups) {
+            this.planId = planId;
+            this.mapOnBackups = mapOnBackups;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            MappingsCacheKey that = (MappingsCacheKey) o;
+            return mapOnBackups == that.mapOnBackups && Objects.equals(planId, 
that.planId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(planId, mapOnBackups);
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
index 8435366979..d2bae3afcf 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
@@ -49,6 +49,11 @@ class AllOfTarget extends AbstractTarget {
         return ((AbstractTarget) other).colocate(this);
     }
 
+    @Override
+    public ExecutionTarget trimTo(ExecutionTarget other) {
+        return this;
+    }
+
     @Override
     ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
         return colocate(this, other);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
index 500a5026c3..3098c7dd97 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
@@ -56,6 +56,21 @@ class OneOfTarget extends AbstractTarget {
         return ((AbstractTarget) other).colocate(this);
     }
 
+    @Override
+    public ExecutionTarget trimTo(ExecutionTarget other) {
+        assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
+
+        long otherNodes = ((AbstractTarget) other).nodes;
+
+        long newNodes = nodes & otherNodes;
+
+        if (newNodes == nodes || newNodes == 0) {
+            return this;
+        }
+
+        return new OneOfTarget(newNodes);
+    }
+
     @Override
     public ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
         return colocate(other, this);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
index aee148f04e..9a0680eb3e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
 
+import it.unimi.dsi.fastutil.ints.Int2LongFunction;
 import java.util.List;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -67,6 +68,56 @@ class PartitionedTarget extends AbstractTarget {
         return ((AbstractTarget) other).colocate(this);
     }
 
+    @Override
+    public ExecutionTarget trimTo(ExecutionTarget other) {
+        assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
+
+        if (finalised) {
+            return this;
+        }
+
+        long[] newPartitionsNodes = new long[partitionsNodes.length];
+        boolean changed = false;
+        Int2LongFunction partitionNodesResolver = partitionNodeResolver(other);
+
+        for (int i = 0; i < partitionsNodes.length; i++) {
+            long newNodes = partitionsNodes[i] & partitionNodesResolver.get(i);
+
+            if (newNodes == 0) {
+                newNodes = partitionsNodes[i];
+            }
+
+            if (newNodes != partitionsNodes[i]) {
+                changed = true;
+            }
+
+            newPartitionsNodes[i] = newNodes;
+        }
+
+        if (changed) {
+            return new PartitionedTarget(false, newPartitionsNodes, 
enlistmentConsistencyTokens);
+        }
+
+        return this;
+    }
+
+    private Int2LongFunction partitionNodeResolver(ExecutionTarget other) {
+        Int2LongFunction partitionNodesResolver;
+
+        if (other instanceof PartitionedTarget 
+                && ((PartitionedTarget) other).partitionsNodes.length == 
partitionsNodes.length) {
+            PartitionedTarget otherPartitioned = (PartitionedTarget) other;
+
+            partitionNodesResolver = partId -> 
otherPartitioned.partitionsNodes[partId];
+        } else {
+            long otherNodes = ((AbstractTarget) other).nodes;
+
+            partitionNodesResolver = partId -> otherNodes;
+        }
+
+        return partitionNodesResolver;
+    }
+
     @Override
     ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
         return colocate(other, this);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index 9a1eadae31..e369c311d3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -21,6 +21,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
@@ -38,12 +41,13 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
             throw new IllegalArgumentException("Supported up to 64 nodes, but 
was " + nodes.size());
         }
 
-        this.nodes = nodes;
+        // to make mapping stable
+        this.nodes = nodes.stream().sorted().collect(Collectors.toList());
 
         nodeNameToId = new Object2LongOpenHashMap<>(nodes.size());
 
         int idx = 0;
-        for (String name : nodes) {
+        for (String name : this.nodes) {
             nodeNameToId.putIfAbsent(name, 1L << idx++);
         }
     }
@@ -64,17 +68,24 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
     }
 
     @Override
-    public ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes) {
-        long[] partitionNodes = new long[nodes.size()];
-        long[] enlistmentConsistencyTokens = new long[nodes.size()];
+    public ExecutionTarget partitioned(List<TokenizedAssignments> assignments) 
{
+        long[] partitionNodes = new long[assignments.size()];
+        long[] enlistmentConsistencyTokens = new long[assignments.size()];
 
         int idx = 0;
-        for (NodeWithConsistencyToken e : nodes) {
-            partitionNodes[idx] = nodeNameToId.getOrDefault(e.name(), 0);
-            enlistmentConsistencyTokens[idx++] = 
e.enlistmentConsistencyToken();
+        boolean finalised = true;
+        for (TokenizedAssignments assignment : assignments) {
+            finalised = finalised && assignment.nodes().size() < 2;
+
+            for (Assignment a : assignment.nodes()) {
+                partitionNodes[idx] |= 
nodeNameToId.getOrDefault(a.consistentId(), 0);
+                enlistmentConsistencyTokens[idx] = assignment.token();
+            }
+
+            idx++;
         }
 
-        return new PartitionedTarget(true, partitionNodes, 
enlistmentConsistencyTokens);
+        return new PartitionedTarget(finalised, partitionNodes, 
enlistmentConsistencyTokens);
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
index 199c8758ef..8810657f31 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
@@ -49,6 +49,21 @@ class SomeOfTarget extends AbstractTarget {
         return ((AbstractTarget) other).colocate(this);
     }
 
+    @Override
+    public ExecutionTarget trimTo(ExecutionTarget other) {
+        assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
+
+        long otherNodes = ((AbstractTarget) other).nodes;
+
+        long newNodes = nodes & otherNodes;
+
+        if (newNodes == nodes || newNodes == 0) {
+            return this;
+        }
+
+        return new SomeOfTarget(newNodes);
+    }
+
     @Override
     ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException {
         return colocate(other, this);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
index f04c563144..653bd6aa60 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Fragment of distributed query.
@@ -123,6 +124,15 @@ public class Fragment {
         return !(root instanceof IgniteSender);
     }
 
+    /** Returns id of target fragment for non-root fragments, return {@code 
null} otherwise. */
+    public @Nullable Long targetFragmentId() {
+        if (root instanceof IgniteSender) {
+            return ((IgniteSender) root).targetFragmentId();
+        }
+
+        return null;
+    }
+
     public boolean single() {
         return rootFragment() || (root instanceof IgniteSender
                 && ((IgniteSender) 
root).sourceDistribution().satisfies(IgniteDistributions.single()));
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 5926253c80..feac21c78a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -73,6 +73,7 @@ import 
org.apache.ignite.internal.failure.handlers.StopNodeFailureHandler;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.RunnableX;
@@ -873,24 +874,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry, null);
 
-        var targetProvider = new ExecutionTargetProvider() {
-            @Override
-            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
-                if (mappingException != null) {
-                    return CompletableFuture.failedFuture(mappingException);
-                }
-
-                return completedFuture(factory.allOf(nodeNames));
-            }
-
-            @Override
-            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-                return CompletableFuture.failedFuture(new AssertionError("Not 
supported"));
-            }
-        };
-
-        var partitionPruner = new PartitionPrunerImpl();
-        var mappingService = new MappingServiceImpl(nodeName, targetProvider, 
EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor);
+        var mappingService = createMappingService(nodeName, clockService, 
taskExecutor);
         var tableFunctionRegistry = new TableFunctionRegistryImpl();
 
         List<LogicalNode> logicalNodes = nodeNames.stream()
@@ -921,6 +905,36 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         return executionService;
     }
 
+    private MappingServiceImpl createMappingService(
+            String nodeName,
+            ClockService clock,
+            QueryTaskExecutorImpl taskExecutor
+    ) {
+        var targetProvider = new ExecutionTargetProvider() {
+            @Override
+            public CompletableFuture<ExecutionTarget> forTable(
+                    HybridTimestamp operationTime,
+                    ExecutionTargetFactory factory,
+                    IgniteTable table,
+                    boolean includeBackups
+            ) {
+                if (mappingException != null) {
+                    return CompletableFuture.failedFuture(mappingException);
+                }
+
+                return completedFuture(factory.allOf(nodeNames));
+            }
+
+            @Override
+            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+                return CompletableFuture.failedFuture(new AssertionError("Not 
supported"));
+            }
+        };
+
+        var partitionPruner = new PartitionPrunerImpl();
+        return new MappingServiceImpl(nodeName, clock, targetProvider, 
EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor);
+    }
+
     private SqlOperationContext createContext() {
         return createContext(null);
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 27078417d2..d5fa8bf400 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.sql.SqlCommon;
@@ -93,7 +95,7 @@ public class FragmentMappingTest extends AbstractPlannerTest {
 
     private final TreeSet<String> nodeNames = new TreeSet<>();
 
-    private final TreeMap<String, Pair<IgniteDistribution, List<String>>> 
tables = new TreeMap<>();
+    private final TreeMap<String, Pair<IgniteDistribution, 
List<List<String>>>> tables = new TreeMap<>();
 
     private final Map<String, Integer> tableRows = new HashMap<>();
 
@@ -258,6 +260,17 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
         testRunner.runTest(this::initSchema, "test_partition_pruning.test");
     }
 
+    @Test
+    void backupPartitionsMapping() {
+        addNodes("N0", "N1", "N2", "N3");
+
+        addTable("T1", List.of(List.of("N0", "N1"), List.of("N1", "N2")));
+        addTable("T2", List.of(List.of("N0", "N1", "N2")));
+        addTable("T3", List.of(List.of("N1", "N2")));
+
+        testRunner.runTest(this::initSchema, "test_backup_mapping.test");
+    }
+
     private void addNodes(String node, String... otherNodes) {
         this.nodeNames.add(node);
         this.nodeNames.addAll(Arrays.asList(otherNodes));
@@ -269,14 +282,38 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
         nodeNames.addAll(Arrays.asList(otherNodes));
 
         String tableName = formatName(name, nodeNames);
-        tables.put(tableName, new Pair<>(IgniteDistributions.affinity(-1, -1, 
-1), new ArrayList<>(nodeNames)));
+        tables.put(
+                tableName,
+                new Pair<>(
+                        IgniteDistributions.affinity(-1, -1, -1),
+                        nodeNames.stream()
+                                .map(List::of)
+                                .collect(Collectors.toList())
+                )
+        );
+    }
+
+    private void addTable(String name, List<List<String>> assignments) {
+        String tableName = name;
+
+        for (List<String> partitionNodes : assignments) {
+            tableName = formatName(tableName, new TreeSet<>(partitionNodes));
+        }
+
+        tables.put(
+                tableName,
+                new Pair<>(
+                        IgniteDistributions.affinity(-1, -1, -1),
+                        assignments
+                )
+        );
     }
 
     private void addTableSingle(String name, String... nodes) {
         TreeSet<String> nodeNames = new TreeSet<>(Arrays.asList(nodes));
         String tableName = formatName(name, nodeNames);
 
-        tables.put(tableName, new Pair<>(IgniteDistributions.single(), new 
ArrayList<>(nodeNames)));
+        tables.put(tableName, new Pair<>(IgniteDistributions.single(), 
List.of(new ArrayList<>(nodeNames))));
     }
 
     private void addTableIdent(String name, String node, String... otherNodes) 
{
@@ -285,7 +322,15 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
         nodeNames.addAll(Arrays.asList(otherNodes));
 
         String tableName = formatName(name, nodeNames);
-        tables.put(tableName, new Pair<>(IgniteDistributions.identity(0), new 
ArrayList<>(nodeNames)));
+        tables.put(
+                tableName,
+                new Pair<>(
+                        IgniteDistributions.identity(0),
+                        nodeNames.stream()
+                                .map(List::of)
+                                .collect(Collectors.toList())
+                )
+        );
     }
 
     private void setRowCount(String tableName, int rowCount) {
@@ -304,6 +349,10 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
             try (IgnitePlanner planner = ctx.planner()) {
                 assertNotNull(planner);
 
+                var plan = physicalPlan(planner, ctx.query());
+
+                System.out.println(RelOptUtil.toString(plan, 
SqlExplainLevel.ALL_ATTRIBUTES));
+
                 return physicalPlan(planner, ctx.query());
             }
         } catch (Exception e) {
@@ -316,25 +365,17 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
         List<IgniteDataSource> dataSources = new ArrayList<>();
         int objectId = 1;
 
-        Map<String, List<String>> table2NodeNames = new HashMap<>();
+        Map<String, List<List<String>>> table2Assignments = new HashMap<>();
 
-        for (Map.Entry<String, Pair<IgniteDistribution, List<String>>> e : 
tables.entrySet()) {
+        for (Map.Entry<String, Pair<IgniteDistribution, List<List<String>>>> e 
: tables.entrySet()) {
             String tableName = e.getKey();
             String tableShortName = tableName.substring(0, 
tableName.indexOf('_'));
             // Generate distinct row counts for each table to ensure that the 
optimizer produces the same results.
-            int tableSize = tableRows.getOrDefault(tableShortName, 100 + 
objectId);
+            int tableSize = tableRows.getOrDefault(tableShortName, 100_000 + 
objectId);
 
-            List<String> tableNodeNames = e.getValue().getSecond();
+            List<List<String>> assignments = e.getValue().getSecond();
 
-            for (String tableNodeName : tableNodeNames) {
-                if (!nodeNames.contains(tableNodeName)) {
-                    String message = format(
-                            "Expected node {} for table {}. Registered nodes: 
{}",
-                            tableNodeName, tableShortName, nodeNames
-                    );
-                    throw new IllegalArgumentException(message);
-                }
-            }
+            validateAssignments(tableShortName, assignments);
 
             IgniteDistribution distribution = e.getValue().getFirst();
 
@@ -350,7 +391,7 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
 
             IgniteDistribution distributionToUse;
             if (distribution.function() instanceof AffinityDistribution) {
-                distributionToUse = IgniteDistributions.affinity(0, objectId, 
1);
+                distributionToUse = IgniteDistributions.affinity(0, objectId, 
objectId);
             } else {
                 distributionToUse = distribution;
             }
@@ -361,12 +402,12 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
                     .size(tableSize)
                     .tableId(objectId)
                     .distribution(distributionToUse)
-                    .partitions(tableNodeNames.size())
+                    .partitions(assignments.size())
                     .build();
 
             dataSources.add(testTable);
 
-            table2NodeNames.put(tableName, tableNodeNames);
+            table2Assignments.put(tableName, assignments);
 
             objectId += 1;
         }
@@ -374,13 +415,27 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
         IgniteSchema schema = new IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME, 
1, dataSources);
         ExecutionTargetProvider executionTargetProvider = 
TestBuilders.executionTargetProviderBuilder()
                 .useTablePartitions(true)
-                .addTables(table2NodeNames)
+                .addTables(table2Assignments)
                 .build();
         LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
 
         return new TestSetup(executionTargetProvider, schema, 
logicalTopologySnapshot);
     }
 
+    private void validateAssignments(String tableName, List<List<String>> 
assignments) {
+        for (List<String> partitionAssignments : assignments) {
+            for (String tableNodeName : partitionAssignments) {
+                if (!nodeNames.contains(tableNodeName)) {
+                    String message = format(
+                            "Expected node {} for table {}. Registered nodes: 
{}",
+                            tableNodeName, tableName, nodeNames
+                    );
+                    throw new IllegalArgumentException(message);
+                }
+            }
+        }
+    }
+
     private static String formatName(String name, TreeSet<String> nodeNames) {
         return name + "_" + String.join("", nodeNames);
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index 4979f14057..727eb7b023 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -38,12 +39,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -71,6 +75,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
     private static final MultiStepPlan PLAN;
     private static final MultiStepPlan PLAN_WITH_SYSTEM_VIEW;
     private static final TestCluster cluster;
+    private static final ClockService CLOCK_SERVICE = new TestClockService(new 
TestHybridClock(System::currentTimeMillis));
     private static final MappingParameters PARAMS = MappingParameters.EMPTY;
     private static final PartitionPruner PARTITION_PRUNER = (fragments, 
dynParams) -> fragments;
 
@@ -179,8 +184,9 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         // Initialize mapping service.
         ExecutionTargetProvider targetProvider = 
Mockito.spy(createTargetProvider(nodeNames));
-        MappingServiceImpl mappingService =
-                new MappingServiceImpl(localNodeName, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run);
+        MappingServiceImpl mappingService = new MappingServiceImpl(
+                localNodeName, CLOCK_SERVICE, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
+        );
 
         mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
                 new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
@@ -188,7 +194,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN, 
PARAMS));
         List<MappedFragment> sysViewMapping = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
 
-        verify(targetProvider, times(2)).forTable(any(), any());
+        verify(targetProvider, times(2)).forTable(any(), any(), any(), 
anyBoolean());
         verify(targetProvider, times(1)).forSystemView(any(), any());
 
         assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
@@ -222,7 +228,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         // Plan with tables that include left node must be invalidated.
         assertNotSame(tableOnlyMapping, await(mappingService.map(PLAN, 
PARAMS)));
 
-        verify(targetProvider, times(4)).forTable(any(), any());
+        verify(targetProvider, times(4)).forTable(any(), any(), any(), 
anyBoolean());
         verify(targetProvider, times(2)).forSystemView(any(), any());
     }
 
@@ -248,14 +254,15 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         // Initialize mapping service.
         ExecutionTargetProvider targetProvider = 
Mockito.spy(createTargetProvider(nodeNames));
-        MappingServiceImpl mappingService =
-                new MappingServiceImpl(localNodeName, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run);
+        MappingServiceImpl mappingService = new MappingServiceImpl(
+                localNodeName, CLOCK_SERVICE, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
+        );
 
         mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
                 new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
 
         List<MappedFragment> mappedFragments = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
-        verify(targetProvider, times(1)).forTable(any(), any());
+        verify(targetProvider, times(1)).forTable(any(), any(), any(), 
anyBoolean());
         verify(targetProvider, times(1)).forSystemView(any(), any());
 
         // Simulate expiration of the primary replica for non-mapped table - 
the cache entry should not be invalidated.
@@ -266,7 +273,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         // Simulate expiration of the primary replica for mapped table - the 
cache entry should be invalidated.
         
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T1")));
         assertNotSame(mappedFragments, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
-        verify(targetProvider, times(2)).forTable(any(), any());
+        verify(targetProvider, times(2)).forTable(any(), any(), any(), 
anyBoolean());
         verify(targetProvider, times(2)).forSystemView(any(), any());
     }
 
@@ -279,6 +286,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
     private static MappingServiceImpl createMappingServiceNoCache(String 
localNodeName, List<String> nodeNames) {
         return new MappingServiceImpl(
                 localNodeName,
+                CLOCK_SERVICE,
                 createTargetProvider(nodeNames),
                 EmptyCacheFactory.INSTANCE,
                 0,
@@ -290,7 +298,12 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
     private static ExecutionTargetProvider createTargetProvider(List<String> 
nodeNames) {
         return new ExecutionTargetProvider() {
             @Override
-            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
+            public CompletableFuture<ExecutionTarget> forTable(
+                    HybridTimestamp operationTime,
+                    ExecutionTargetFactory factory,
+                    IgniteTable table,
+                    boolean includeBackups
+            ) {
                 return 
CompletableFuture.completedFuture(factory.allOf(nodeNames));
             }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
index 94d1518959..5ea7359095 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -36,7 +36,9 @@ import java.util.concurrent.CompletionException;
 import java.util.function.BiFunction;
 import java.util.function.Supplier;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.ignite.internal.TestHybridClock;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.sql.ResultSetMetadataImpl;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
@@ -188,13 +190,17 @@ final class MappingTestRunner {
         }
     }
 
-    private String produceMapping(String nodeName,
+    private String produceMapping(
+            String nodeName,
             ExecutionTargetProvider targetProvider,
             LogicalTopologySnapshot snapshot,
-            MultiStepPlan plan) {
+            MultiStepPlan plan
+    ) {
 
         PartitionPruner partitionPruner = new PartitionPrunerImpl();
-        MappingServiceImpl mappingService = new MappingServiceImpl(nodeName,
+        MappingServiceImpl mappingService = new MappingServiceImpl(
+                nodeName,
+                new TestClockService(new 
TestHybridClock(System::currentTimeMillis)),
                 targetProvider,
                 EmptyCacheFactory.INSTANCE,
                 0,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 02493459c8..c3a921a731 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -48,6 +48,9 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
 import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
@@ -73,6 +76,7 @@ import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -83,7 +87,6 @@ import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -629,10 +632,10 @@ public class TestBuilders {
                     new DdlSqlToCommandConverter(), PLANNING_TIMEOUT, 
PLANNING_THREAD_COUNT,
                     mock(MetricManagerImpl.class), schemaManager);
 
-            Map<String, List<String>> owningNodesByTableName = new HashMap<>();
+            Map<String, List<List<String>>> owningNodesByTableName = new 
HashMap<>();
             for (Entry<String, Map<String, ScannableTable>> entry : 
nodeName2tableName2table.entrySet()) {
                 for (String tableName : entry.getValue().keySet()) {
-                    owningNodesByTableName.computeIfAbsent(tableName, key -> 
new ArrayList<>()).add(entry.getKey());
+                    owningNodesByTableName.computeIfAbsent(tableName, key -> 
new ArrayList<>()).add(List.of(entry.getKey()));
                 }
             }
 
@@ -683,6 +686,7 @@ public class TestBuilders {
                         var partitionPruner = new PartitionPrunerImpl();
                         var mappingService = new MappingServiceImpl(
                                 name,
+                                new TestClockService(clock, clockWaiter),
                                 targetProvider,
                                 EmptyCacheFactory.INSTANCE,
                                 0,
@@ -1469,7 +1473,7 @@ public class TestBuilders {
     /** A builder to create instances of {@link ExecutionTargetProvider}. */
     public static final class ExecutionTargetProviderBuilder {
 
-        private final Map<String, List<String>> owningNodesByTableName = new 
HashMap<>();
+        private final Map<String, List<List<String>>> owningNodesByTableName = 
new HashMap<>();
 
         private Function<String, List<String>> owningNodesBySystemViewName = 
(n) -> null;
 
@@ -1480,7 +1484,7 @@ public class TestBuilders {
         }
 
         /** Adds tables to list of nodes mapping. */
-        public ExecutionTargetProviderBuilder addTables(Map<String, 
List<String>> tables) {
+        public ExecutionTargetProviderBuilder addTables(Map<String, 
List<List<String>>> tables) {
             this.owningNodesByTableName.putAll(tables);
             return this;
         }
@@ -1514,13 +1518,13 @@ public class TestBuilders {
 
         final Function<String, List<String>> owningNodesBySystemViewName;
 
-        final Map<String, List<String>> owningNodesByTableName;
+        final Map<String, List<List<String>>> owningNodesByTableName;
 
         final boolean useTablePartitions;
 
         private TestNodeExecutionTargetProvider(
                 Function<String, List<String>> owningNodesBySystemViewName,
-                Map<String, List<String>> owningNodesByTableName,
+                Map<String, List<List<String>>> owningNodesByTableName,
                 boolean useTablePartitions
         ) {
             this.owningNodesBySystemViewName = owningNodesBySystemViewName;
@@ -1529,33 +1533,45 @@ public class TestBuilders {
         }
 
         @Override
-        public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
-            List<String> owningNodes = 
owningNodesByTableName.get(table.name());
+        public CompletableFuture<ExecutionTarget> forTable(
+                HybridTimestamp operationTime,
+                ExecutionTargetFactory factory,
+                IgniteTable table,
+                boolean includeBackups
+        ) {
+            List<List<String>> owningNodes = 
owningNodesByTableName.get(table.name());
 
             if (nullOrEmpty(owningNodes)) {
                 throw new AssertionError("DataProvider is not configured for 
table " + table.name());
             }
 
-            List<NodeWithConsistencyToken> nodes;
+            List<TokenizedAssignments> assignments;
 
             if (useTablePartitions) {
                 int p = table.partitions();
 
-                nodes = IntStream.range(0, p).mapToObj(n -> {
-                    String nodeName = owningNodes.get(n % owningNodes.size());
-                    return new NodeWithConsistencyToken(nodeName, p);
+                assignments = IntStream.range(0, p).mapToObj(n -> {
+                    List<String> nodes = owningNodes.get(n % 
owningNodes.size());
+                    return partitionNodesToAssignment(nodes, p);
                 }).collect(Collectors.toList());
             } else {
-                nodes = owningNodes.stream()
-                        .map(name -> new NodeWithConsistencyToken(name, 1))
+                assignments = owningNodes.stream()
+                        .map(nodes -> partitionNodesToAssignment(nodes, 1))
                         .collect(Collectors.toList());
             }
 
-            ExecutionTarget target = factory.partitioned(nodes);
+            ExecutionTarget target = factory.partitioned(assignments);
 
             return CompletableFuture.completedFuture(target);
         }
 
+        private static TokenizedAssignments 
partitionNodesToAssignment(List<String> nodes, long token) {
+            return new TokenizedAssignmentsImpl(
+                    
nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet()),
+                    token
+            );
+        }
+
         @Override
         public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
             List<String> nodes = 
owningNodesBySystemViewName.apply(view.name());
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test 
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 9dd930b7d4..b05ec49597 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -1,23 +1,14 @@
 N0
 SELECT (SELECT count(*) FROM ct_n1) FROM t_n1
 ---
-Fragment#4 root
+Fragment#0 root
   executionNodes: [N0]
-  remoteFragments: [5]
-  exchangeSourceNodes: {5=[N1]}
-  tree:
-    Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#1
-  targetNodes: [N1]
-  executionNodes: [N1]
-  tables: [T_N1]
-  partitions: {N1=[0:1]}
+  remoteFragments: [4]
+  exchangeSourceNodes: {4=[N1]}
   tree:
-    Sender(targetFragment=5, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
+    Receiver(sourceFragment=4, exchange=4, distribution=single)
 
-Fragment#5
+Fragment#4
   targetNodes: [N0]
   executionNodes: [N1]
   remoteFragments: [1]
@@ -25,12 +16,21 @@ Fragment#5
   tables: [CT_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=4, exchange=5, distribution=single)
+    Sender(targetFragment=0, exchange=4, distribution=single)
       Project
         NestedLoopJoin
           Receiver(sourceFragment=1, exchange=1, distribution=single)
           ColocatedHashAggregate
             TableScan(name=PUBLIC.CT_N1, source=2, partitions=1, 
distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=4, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
 ---
 
 N1
@@ -62,23 +62,14 @@ Fragment#1
 N0
 SELECT (SELECT count(*) FROM ct_n1) FROM t_n2
 ---
-Fragment#4 root
+Fragment#0 root
   executionNodes: [N0]
-  remoteFragments: [5]
-  exchangeSourceNodes: {5=[N1]}
-  tree:
-    Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#1
-  targetNodes: [N1]
-  executionNodes: [N2]
-  tables: [T_N2]
-  partitions: {N2=[0:1]}
+  remoteFragments: [4]
+  exchangeSourceNodes: {4=[N1]}
   tree:
-    Sender(targetFragment=5, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T_N2, source=3, partitions=1, distribution=random)
+    Receiver(sourceFragment=4, exchange=4, distribution=single)
 
-Fragment#5
+Fragment#4
   targetNodes: [N0]
   executionNodes: [N1]
   remoteFragments: [1]
@@ -86,12 +77,21 @@ Fragment#5
   tables: [CT_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=4, exchange=5, distribution=single)
+    Sender(targetFragment=0, exchange=4, distribution=single)
       Project
         NestedLoopJoin
           Receiver(sourceFragment=1, exchange=1, distribution=single)
           ColocatedHashAggregate
             TableScan(name=PUBLIC.CT_N1, source=2, partitions=1, 
distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [T_N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=4, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T_N2, source=3, partitions=1, distribution=random)
 ---
 
 N1
@@ -148,20 +148,20 @@ N0
 SELECT t.c1 FROM ct_n1 t WHERE t.c1 < 5 AND
 EXISTS (SELECT x FROM table(system_range(t.c1, t.c2)) WHERE mod(x, 2) = 0)
 ---
-Fragment#3 root
+Fragment#0 root
   executionNodes: [N0]
-  remoteFragments: [4]
-  exchangeSourceNodes: {4=[N1]}
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N1]}
   tree:
-    Receiver(sourceFragment=4, exchange=4, distribution=single)
+    Receiver(sourceFragment=3, exchange=3, distribution=single)
 
-Fragment#4
+Fragment#3
   targetNodes: [N0]
   executionNodes: [N1]
   tables: [CT_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=3, exchange=4, distribution=single)
+    Sender(targetFragment=0, exchange=3, distribution=single)
       Project
         CorrelatedNestedLoopJoin
           TableScan(name=PUBLIC.CT_N1, source=1, partitions=1, 
distribution=single)
@@ -212,25 +212,25 @@ Fragment#0 root
         ReduceHashAggregate
           Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2 correlated
+Fragment#1
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
-  tables: [T2_N0N1N2]
+  tables: [T1_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
-  pruningMetadata: [3=[{0=$cor0.ID}]]
   tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      MapHashAggregate
-        TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3, 
distribution=random)
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N0N1N2, source=4, partitions=3, 
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
 
-Fragment#1
+Fragment#2 correlated
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
-  tables: [T1_N0N1N2]
+  tables: [T2_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [3=[{0=$cor1.ID}]]
   tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T1_N0N1N2, source=4, partitions=3, 
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3, 
distribution=random)
 ---
 # Pass partition pruning metadata to correlated joins.
 N0
@@ -252,35 +252,35 @@ Fragment#0 root
               ReduceHashAggregate
                 Receiver(sourceFragment=3, exchange=3, distribution=single)
 
-Fragment#3 correlated
+Fragment#1
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
-  tables: [T2_N0N1N2]
+  tables: [T3_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
-  pruningMetadata: [4=[{0=$cor1.ID}]]
   tree:
-    Sender(targetFragment=0, exchange=3, distribution=single)
-      MapHashAggregate
-        TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3, 
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
 
 Fragment#2 correlated
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
   tables: [T1_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
-  pruningMetadata: [5=[{0=$cor0.ID}]]
+  pruningMetadata: [5=[{0=$cor2.ID}]]
   tree:
     Sender(targetFragment=0, exchange=2, distribution=single)
       TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3, 
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
 
-Fragment#1
+Fragment#3 correlated
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
-  tables: [T3_N0N1N2]
+  tables: [T2_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [4=[{0=$cor3.ID}]]
   tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3, 
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+    Sender(targetFragment=0, exchange=3, distribution=single)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
 ---
 # Pass partition pruning metadata to correlated joins one layer deep.
 N0
@@ -305,33 +305,33 @@ Fragment#0 root
               ReduceHashAggregate
                 Receiver(sourceFragment=3, exchange=3, distribution=single)
 
-Fragment#3 correlated
+Fragment#1
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
-  tables: [T2_N0N1N2]
+  tables: [T3_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
-  pruningMetadata: [4=[{0=$cor0.ID}, {0=$cor2.ID}]]
   tree:
-    Sender(targetFragment=0, exchange=3, distribution=single)
-      MapHashAggregate
-        TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3, 
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
 
 Fragment#2 correlated
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
   tables: [T1_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
-  pruningMetadata: [5=[{0=$cor0.ID}]]
+  pruningMetadata: [5=[{0=$cor3.ID}]]
   tree:
     Sender(targetFragment=0, exchange=2, distribution=single)
       TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3, 
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
 
-Fragment#1
+Fragment#3 correlated
   targetNodes: [N0]
   executionNodes: [N0, N1, N2]
-  tables: [T3_N0N1N2]
+  tables: [T2_N0N1N2]
   partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+  pruningMetadata: [4=[{0=$cor3.ID}, {0=$cor5.ID}]]
   tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3, 
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+    Sender(targetFragment=0, exchange=3, distribution=single)
+      MapHashAggregate
+        TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3, 
distribution=random)
 ---
diff --git a/modules/sql-engine/src/test/resources/mapping/hash_join.test 
b/modules/sql-engine/src/test/resources/mapping/hash_join.test
index 7c4ae58c24..c559e6c88a 100644
--- a/modules/sql-engine/src/test/resources/mapping/hash_join.test
+++ b/modules/sql-engine/src/test/resources/mapping/hash_join.test
@@ -11,14 +11,25 @@ Fragment#0 root
 Fragment#1
   targetNodes: [N0]
   executionNodes: [N1]
-  tables: [T1_N1, T2_N1]
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N1]}
+  tables: [T1_N1]
   partitions: {N1=[0:1]}
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
       Project
         HashJoin
           TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
-          TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, 
distribution=affinity[table: T2_N1, columns: [ID]])
+          Receiver(sourceFragment=3, exchange=3, distribution=affinity[table: 
T1_N1, columns: [ID]])
+
+Fragment#3
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T2_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=1, exchange=3, distribution=affinity[table: T1_N1, 
columns: [ID]])
+      TableScan(name=PUBLIC.T2_N1, source=4, partitions=1, 
distribution=affinity[table: T2_N1, columns: [ID]])
 ---
 
 N0
@@ -53,28 +64,28 @@ Fragment#0 root
   tree:
     Receiver(sourceFragment=1, exchange=1, distribution=single)
 
-Fragment#4
+Fragment#1
   targetNodes: [N0]
   executionNodes: [N1]
-  remoteFragments: [5]
-  exchangeSourceNodes: {5=[N2]}
-  tables: [T1_N1, T2_N2]
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N2]}
+  tables: [T1_N1]
   partitions: {N1=[0:1]}
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
       Project
         HashJoin
           TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
-          Receiver(sourceFragment=5, exchange=5, distribution=affinity[table: 
T2_N2, columns: [ID]])
+          Receiver(sourceFragment=3, exchange=3, distribution=affinity[table: 
T1_N1, columns: [ID]])
 
-Fragment#5
+Fragment#3
   targetNodes: [N1]
   executionNodes: [N2]
   tables: [T2_N2]
   partitions: {N2=[0:1]}
   tree:
-    Sender(targetFragment=4, exchange=5, distribution=affinity[table: T2_N2, 
columns: [ID]])
-      TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, 
distribution=affinity[table: T2_N2, columns: [ID]])
+    Sender(targetFragment=1, exchange=3, distribution=affinity[table: T1_N1, 
columns: [ID]])
+      TableScan(name=PUBLIC.T2_N2, source=4, partitions=1, 
distribution=affinity[table: T2_N2, columns: [ID]])
 ---
 
 N0
diff --git a/modules/sql-engine/src/test/resources/mapping/merge_join.test 
b/modules/sql-engine/src/test/resources/mapping/merge_join.test
index c1d88b63bf..fd9ae5f287 100644
--- a/modules/sql-engine/src/test/resources/mapping/merge_join.test
+++ b/modules/sql-engine/src/test/resources/mapping/merge_join.test
@@ -1,53 +1,3 @@
-N0
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 
'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN 
t2_n1 USING (id)
----
-Fragment#0 root
-  executionNodes: [N0]
-  remoteFragments: [1]
-  exchangeSourceNodes: {1=[N1]}
-  tree:
-    Receiver(sourceFragment=1, exchange=1, distribution=single)
-
-Fragment#1
-  targetNodes: [N0]
-  executionNodes: [N1]
-  tables: [T1_N1, T2_N1]
-  partitions: {N1=[0:1]}
-  tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      Project
-        MergeJoin
-          Sort
-            TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
-          Sort
-            TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, 
distribution=affinity[table: T2_N1, columns: [ID]])
----
-
-N1
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 
'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN 
t2_n1 USING (id)
----
-Fragment#0 root
-  executionNodes: [N1]
-  remoteFragments: [1]
-  exchangeSourceNodes: {1=[N1]}
-  tree:
-    Receiver(sourceFragment=1, exchange=1, distribution=single)
-
-Fragment#1
-  targetNodes: [N1]
-  executionNodes: [N1]
-  tables: [T1_N1, T2_N1]
-  partitions: {N1=[0:1]}
-  tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      Project
-        MergeJoin
-          Sort
-            TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
-          Sort
-            TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, 
distribution=affinity[table: T2_N1, columns: [ID]])
----
-
 N0
 SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter', 
'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN 
t2_n2 USING (id)
 ---
@@ -58,11 +8,11 @@ Fragment#0 root
   tree:
     Receiver(sourceFragment=1, exchange=1, distribution=single)
 
-Fragment#4
+Fragment#1
   targetNodes: [N0]
   executionNodes: [N1]
-  remoteFragments: [5]
-  exchangeSourceNodes: {5=[N2]}
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N2]}
   tables: [T1_N1]
   partitions: {N1=[0:1]}
   tree:
@@ -71,15 +21,15 @@ Fragment#4
         MergeJoin
           Sort
             TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
-          Receiver(sourceFragment=5, exchange=5, distribution=affinity[table: 
T2_N2, columns: [ID]])
+          Sort
+            Receiver(sourceFragment=3, exchange=3, 
distribution=affinity[table: T1_N1, columns: [ID]])
 
-Fragment#5
+Fragment#3
   targetNodes: [N1]
   executionNodes: [N2]
   tables: [T2_N2]
   partitions: {N2=[0:1]}
   tree:
-    Sender(targetFragment=4, exchange=5, distribution=affinity[table: T2_N2, 
columns: [ID]])
-      Sort
-        TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, 
distribution=affinity[table: T2_N2, columns: [ID]])
+    Sender(targetFragment=1, exchange=3, distribution=affinity[table: T1_N1, 
columns: [ID]])
+      TableScan(name=PUBLIC.T2_N2, source=4, partitions=1, 
distribution=affinity[table: T2_N2, columns: [ID]])
 ---
diff --git a/modules/sql-engine/src/test/resources/mapping/set_ops.test 
b/modules/sql-engine/src/test/resources/mapping/set_ops.test
index 377a4d2bbd..42aa6ec7f9 100644
--- a/modules/sql-engine/src/test/resources/mapping/set_ops.test
+++ b/modules/sql-engine/src/test/resources/mapping/set_ops.test
@@ -10,15 +10,6 @@ Fragment#0 root
       Receiver(sourceFragment=1, exchange=1, distribution=single)
       Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2
-  targetNodes: [N1]
-  executionNodes: [N2]
-  tables: [T2_N2]
-  partitions: {N2=[0:1]}
-  tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, distribution=random)
-
 Fragment#1
   targetNodes: [N1]
   executionNodes: [N1]
@@ -27,6 +18,15 @@ Fragment#1
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
       TableScan(name=PUBLIC.T1_N1, source=4, partitions=1, distribution=random)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [T2_N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, distribution=random)
 ---
 
 N1
diff --git a/modules/sql-engine/src/test/resources/mapping/table_identity.test 
b/modules/sql-engine/src/test/resources/mapping/table_identity.test
index d7f03a6109..e13befad79 100644
--- a/modules/sql-engine/src/test/resources/mapping/table_identity.test
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity.test
@@ -10,15 +10,6 @@ Fragment#0 root
       Receiver(sourceFragment=1, exchange=1, distribution=single)
       Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2
-  targetNodes: [N0]
-  executionNodes: [N2]
-  tables: [NT2_N2]
-  partitions: {N2=[0:1]}
-  tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1, 
distribution=identity[0])
-
 Fragment#1
   targetNodes: [N0]
   executionNodes: [N1]
@@ -27,6 +18,15 @@ Fragment#1
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
       TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1, 
distribution=identity[0])
+
+Fragment#2
+  targetNodes: [N0]
+  executionNodes: [N2]
+  tables: [NT2_N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1, 
distribution=identity[0])
 ---
 
 N1
@@ -41,15 +41,6 @@ Fragment#0 root
       Receiver(sourceFragment=1, exchange=1, distribution=single)
       Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2
-  targetNodes: [N1]
-  executionNodes: [N2]
-  tables: [NT2_N2]
-  partitions: {N2=[0:1]}
-  tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1, 
distribution=identity[0])
-
 Fragment#1
   targetNodes: [N1]
   executionNodes: [N1]
@@ -58,6 +49,15 @@ Fragment#1
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
       TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1, 
distribution=identity[0])
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [NT2_N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1, 
distribution=identity[0])
 ---
 
 N0
@@ -72,23 +72,23 @@ Fragment#0 root
       Receiver(sourceFragment=1, exchange=1, distribution=single)
       Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2
+Fragment#1
   targetNodes: [N0]
   executionNodes: [N1]
-  tables: [NT2_N1]
+  tables: [NT1_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1, 
distribution=identity[0])
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1, 
distribution=identity[0])
 
-Fragment#1
+Fragment#2
   targetNodes: [N0]
   executionNodes: [N1]
-  tables: [NT1_N1]
+  tables: [NT2_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1, 
distribution=identity[0])
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1, 
distribution=identity[0])
 ---
 
 N1
@@ -103,21 +103,21 @@ Fragment#0 root
       Receiver(sourceFragment=1, exchange=1, distribution=single)
       Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2
+Fragment#1
   targetNodes: [N1]
   executionNodes: [N1]
-  tables: [NT2_N1]
+  tables: [NT1_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1, 
distribution=identity[0])
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1, 
distribution=identity[0])
 
-Fragment#1
+Fragment#2
   targetNodes: [N1]
   executionNodes: [N1]
-  tables: [NT1_N1]
+  tables: [NT2_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1, 
distribution=identity[0])
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1, 
distribution=identity[0])
 ---
diff --git 
a/modules/sql-engine/src/test/resources/mapping/table_identity_single.test 
b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
index 006200b299..ce1d33459b 100644
--- a/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
@@ -1,23 +1,14 @@
 N0
 SELECT * FROM CT_n1, NT_n1
 ---
-Fragment#4 root
+Fragment#0 root
   executionNodes: [N0]
-  remoteFragments: [5]
-  exchangeSourceNodes: {5=[N1]}
+  remoteFragments: [4]
+  exchangeSourceNodes: {4=[N1]}
   tree:
-    Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#2
-  targetNodes: [N1]
-  executionNodes: [N1]
-  tables: [NT_N1]
-  partitions: {N1=[0:1]}
-  tree:
-    Sender(targetFragment=5, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.NT_N1, source=3, partitions=1, 
distribution=identity[0])
+    Receiver(sourceFragment=4, exchange=4, distribution=single)
 
-Fragment#5
+Fragment#4
   targetNodes: [N0]
   executionNodes: [N1]
   remoteFragments: [2]
@@ -25,10 +16,19 @@ Fragment#5
   tables: [CT_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=4, exchange=5, distribution=single)
+    Sender(targetFragment=0, exchange=4, distribution=single)
       NestedLoopJoin
         TableScan(name=PUBLIC.CT_N1, source=1, partitions=1, 
distribution=single)
         Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [NT_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=4, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.NT_N1, source=3, partitions=1, 
distribution=identity[0])
 ---
 
 N1
@@ -58,23 +58,14 @@ Fragment#2
 N0
 SELECT * FROM CT_n1, NT_n2
 ---
-Fragment#4 root
+Fragment#0 root
   executionNodes: [N0]
-  remoteFragments: [5]
-  exchangeSourceNodes: {5=[N1]}
+  remoteFragments: [4]
+  exchangeSourceNodes: {4=[N1]}
   tree:
-    Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#2
-  targetNodes: [N1]
-  executionNodes: [N2]
-  tables: [NT_N2]
-  partitions: {N2=[0:1]}
-  tree:
-    Sender(targetFragment=5, exchange=2, distribution=single)
-      TableScan(name=PUBLIC.NT_N2, source=3, partitions=1, 
distribution=identity[0])
+    Receiver(sourceFragment=4, exchange=4, distribution=single)
 
-Fragment#5
+Fragment#4
   targetNodes: [N0]
   executionNodes: [N1]
   remoteFragments: [2]
@@ -82,10 +73,19 @@ Fragment#5
   tables: [CT_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=4, exchange=5, distribution=single)
+    Sender(targetFragment=0, exchange=4, distribution=single)
       NestedLoopJoin
         TableScan(name=PUBLIC.CT_N1, source=1, partitions=1, 
distribution=single)
         Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [NT_N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=4, exchange=2, distribution=single)
+      TableScan(name=PUBLIC.NT_N2, source=3, partitions=1, 
distribution=identity[0])
 ---
 
 N1
diff --git a/modules/sql-engine/src/test/resources/mapping/table_single.test 
b/modules/sql-engine/src/test/resources/mapping/table_single.test
index 12d7342b82..7a2aa7ea1d 100644
--- a/modules/sql-engine/src/test/resources/mapping/table_single.test
+++ b/modules/sql-engine/src/test/resources/mapping/table_single.test
@@ -1,20 +1,20 @@
 N0
 SELECT * FROM ct1_n1, ct2_n1
 ---
-Fragment#3 root
+Fragment#0 root
   executionNodes: [N0]
-  remoteFragments: [4]
-  exchangeSourceNodes: {4=[N1]}
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N1]}
   tree:
-    Receiver(sourceFragment=4, exchange=4, distribution=single)
+    Receiver(sourceFragment=3, exchange=3, distribution=single)
 
-Fragment#4
+Fragment#3
   targetNodes: [N0]
   executionNodes: [N1]
   tables: [CT1_N1, CT2_N1]
   partitions: {N1=[0:1]}
   tree:
-    Sender(targetFragment=3, exchange=4, distribution=single)
+    Sender(targetFragment=0, exchange=3, distribution=single)
       NestedLoopJoin
         TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1, 
distribution=single)
         TableScan(name=PUBLIC.CT2_N1, source=2, partitions=1, 
distribution=single)
@@ -36,65 +36,65 @@ Fragment#0 root
 N0
 SELECT * FROM ct1_n1, ct2_n2
 ---
-Fragment#5 root
+Fragment#0 root
   executionNodes: [N0]
-  remoteFragments: [6]
-  exchangeSourceNodes: {6=[N2]}
+  remoteFragments: [4]
+  exchangeSourceNodes: {4=[N2]}
   tree:
-    Receiver(sourceFragment=6, exchange=6, distribution=single)
+    Receiver(sourceFragment=4, exchange=4, distribution=single)
 
 Fragment#4
-  targetNodes: [N2]
-  executionNodes: [N1]
-  tables: [CT1_N1]
-  partitions: {N1=[0:1]}
-  tree:
-    Sender(targetFragment=6, exchange=4, distribution=single)
-      TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1, 
distribution=single)
-
-Fragment#6
   targetNodes: [N0]
   executionNodes: [N2]
-  remoteFragments: [4]
-  exchangeSourceNodes: {4=[N1]}
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N1]}
   tables: [CT2_N2]
   partitions: {N2=[0:1]}
   tree:
-    Sender(targetFragment=5, exchange=6, distribution=single)
+    Sender(targetFragment=0, exchange=4, distribution=single)
       NestedLoopJoin
-        Receiver(sourceFragment=4, exchange=4, distribution=single)
+        Receiver(sourceFragment=3, exchange=3, distribution=single)
         TableScan(name=PUBLIC.CT2_N2, source=2, partitions=1, 
distribution=single)
+
+Fragment#3
+  targetNodes: [N2]
+  executionNodes: [N1]
+  tables: [CT1_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=4, exchange=3, distribution=single)
+      TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1, 
distribution=single)
 ---
 
 N1
 SELECT * FROM ct1_n1, ct2_n2
 ---
-Fragment#5 root
+Fragment#0 root
   executionNodes: [N1]
-  remoteFragments: [6]
-  exchangeSourceNodes: {6=[N2]}
+  remoteFragments: [4]
+  exchangeSourceNodes: {4=[N2]}
   tree:
-    Receiver(sourceFragment=6, exchange=6, distribution=single)
+    Receiver(sourceFragment=4, exchange=4, distribution=single)
 
 Fragment#4
-  targetNodes: [N2]
-  executionNodes: [N1]
-  tables: [CT1_N1]
-  partitions: {N1=[0:1]}
-  tree:
-    Sender(targetFragment=6, exchange=4, distribution=single)
-      TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1, 
distribution=single)
-
-Fragment#6
   targetNodes: [N1]
   executionNodes: [N2]
-  remoteFragments: [4]
-  exchangeSourceNodes: {4=[N1]}
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N1]}
   tables: [CT2_N2]
   partitions: {N2=[0:1]}
   tree:
-    Sender(targetFragment=5, exchange=6, distribution=single)
+    Sender(targetFragment=0, exchange=4, distribution=single)
       NestedLoopJoin
-        Receiver(sourceFragment=4, exchange=4, distribution=single)
+        Receiver(sourceFragment=3, exchange=3, distribution=single)
         TableScan(name=PUBLIC.CT2_N2, source=2, partitions=1, 
distribution=single)
+
+Fragment#3
+  targetNodes: [N2]
+  executionNodes: [N1]
+  tables: [CT1_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=4, exchange=3, distribution=single)
+      TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1, 
distribution=single)
 ---
diff --git 
a/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test 
b/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
new file mode 100644
index 0000000000..e3e833b50f
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
@@ -0,0 +1,162 @@
+# root of the query is colocated with first partition
+N0
+SELECT * FROM t1_n0n1_n1n2
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N0, N1]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N0, N1]
+  tables: [T1_N0N1_N1N2]
+  partitions: {N0=[0:2], N1=[1:2]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2, 
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
+# root of the query is colocated with both partitions
+N1
+SELECT * FROM t1_n0n1_n1n2
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N0N1_N1N2]
+  partitions: {N1=[0:2, 1:2]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2, 
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
+# root of the query is colocated with second partition
+N2
+SELECT * FROM t1_n0n1_n1n2
+---
+Fragment#0 root
+  executionNodes: [N2]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N0, N2]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N2]
+  executionNodes: [N0, N2]
+  tables: [T1_N0N1_N1N2]
+  partitions: {N0=[0:2], N2=[1:2]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2, 
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
+# although root is colocated with one of the partitions, algorithm prefers to 
colocate join stage
+N0
+SELECT * FROM t2_n0n1n2 as t1, t3_n1n2 as t2 WHERE t1.id = t2.id
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N1]}
+  tables: [T3_N1N2]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      Project
+        HashJoin
+          TableScan(name=PUBLIC.T3_N1N2, source=2, partitions=1, 
distribution=affinity[table: T3_N1N2, columns: [ID]])
+          Receiver(sourceFragment=3, exchange=3, distribution=affinity[table: 
T3_N1N2, columns: [ID]])
+
+Fragment#3
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T2_N0N1N2]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=1, exchange=3, distribution=affinity[table: T3_N1N2, 
columns: [ID]])
+      TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=1, 
distribution=affinity[table: T2_N0N1N2, columns: [ID]])
+---
+# everything is colocated
+N1
+SELECT * FROM t2_n0n1n2 as t1, t3_n1n2 as t2 WHERE t1.id = t2.id
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N1]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N1]}
+  tables: [T3_N1N2]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      Project
+        HashJoin
+          TableScan(name=PUBLIC.T3_N1N2, source=2, partitions=1, 
distribution=affinity[table: T3_N1N2, columns: [ID]])
+          Receiver(sourceFragment=3, exchange=3, distribution=affinity[table: 
T3_N1N2, columns: [ID]])
+
+Fragment#3
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T2_N0N1N2]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=1, exchange=3, distribution=affinity[table: T3_N1N2, 
columns: [ID]])
+      TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=1, 
distribution=affinity[table: T2_N0N1N2, columns: [ID]])
+---
+# everything is colocated, but from different root
+N2
+SELECT * FROM t2_n0n1n2 as t1, t3_n1n2 as t2 WHERE t1.id = t2.id
+---
+Fragment#0 root
+  executionNodes: [N2]
+  remoteFragments: [1]
+  exchangeSourceNodes: {1=[N2]}
+  tree:
+    Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N2]
+  executionNodes: [N2]
+  remoteFragments: [3]
+  exchangeSourceNodes: {3=[N2]}
+  tables: [T3_N1N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      Project
+        HashJoin
+          TableScan(name=PUBLIC.T3_N1N2, source=2, partitions=1, 
distribution=affinity[table: T3_N1N2, columns: [ID]])
+          Receiver(sourceFragment=3, exchange=3, distribution=affinity[table: 
T3_N1N2, columns: [ID]])
+
+Fragment#3
+  targetNodes: [N2]
+  executionNodes: [N2]
+  tables: [T2_N0N1N2]
+  partitions: {N2=[0:1]}
+  tree:
+    Sender(targetFragment=1, exchange=3, distribution=affinity[table: T3_N1N2, 
columns: [ID]])
+      TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=1, 
distribution=affinity[table: T2_N0N1N2, columns: [ID]])
+---
diff --git 
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test 
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 26ee11a006..371bfcaaae 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -31,16 +31,6 @@ Fragment#0 root
       Receiver(sourceFragment=1, exchange=1, distribution=single)
       Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2
-  targetNodes: [N1]
-  executionNodes: [N4]
-  tables: [T2_N4N5]
-  partitions: {N4=[0:2]}
-  tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
-      Sort
-        TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2, 
distribution=affinity[table: T2_N4N5, columns: [ID]])
-
 Fragment#1
   targetNodes: [N1]
   executionNodes: [N2]
@@ -50,6 +40,16 @@ Fragment#1
     Sender(targetFragment=0, exchange=1, distribution=single)
       Sort
         TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N4]
+  tables: [T2_N4N5]
+  partitions: {N4=[0:2]}
+  tree:
+    Sender(targetFragment=0, exchange=2, distribution=single)
+      Sort
+        TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2, 
distribution=affinity[table: T2_N4N5, columns: [ID]])
 ---
 # Self join, different predicates that produce same set of partitions
 N1
@@ -64,25 +64,25 @@ Fragment#0 root
       Receiver(sourceFragment=1, exchange=1, distribution=single)
       Receiver(sourceFragment=2, exchange=2, distribution=single)
 
-Fragment#2
+Fragment#1
   targetNodes: [N1]
   executionNodes: [N2]
   tables: [T1_N1N2N3]
   partitions: {N2=[1:3]}
   tree:
-    Sender(targetFragment=0, exchange=2, distribution=single)
+    Sender(targetFragment=0, exchange=1, distribution=single)
       Sort
-        TableScan(name=PUBLIC.T1_N1N2N3, source=3, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+        TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
 
-Fragment#1
+Fragment#2
   targetNodes: [N1]
   executionNodes: [N2]
   tables: [T1_N1N2N3]
   partitions: {N2=[1:3]}
   tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
+    Sender(targetFragment=0, exchange=2, distribution=single)
       Sort
-        TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+        TableScan(name=PUBLIC.T1_N1N2N3, source=3, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
 ---
 # Self join, different predicates that produce disjoint set of partitions
 N1
@@ -125,25 +125,25 @@ Fragment#0 root
         ReduceHashAggregate
           Receiver(sourceFragment=2, exchange=2, distribution=single)
 
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N3]
+  tables: [T1_N1N2N3]
+  partitions: {N3=[2:3]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+
 Fragment#2 correlated
   targetNodes: [N0]
   executionNodes: [N1, N2, N3]
   tables: [T3_N1N2N3]
   partitions: {N1=[0:3], N2=[1:3], N3=[2:3]}
-  pruningMetadata: [3=[{0=$cor0.ID}]]
+  pruningMetadata: [3=[{0=$cor1.ID}]]
   tree:
     Sender(targetFragment=0, exchange=2, distribution=single)
       MapHashAggregate
         TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3, 
distribution=random)
-
-Fragment#1
-  targetNodes: [N0]
-  executionNodes: [N3]
-  tables: [T1_N1N2N3]
-  partitions: {N3=[2:3]}
-  tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
 ---
 # Correlated.
 # Prune partitions from left arm statically, and pass meta to the right arm.
@@ -162,23 +162,23 @@ Fragment#0 root
         ReduceHashAggregate
           Receiver(sourceFragment=2, exchange=2, distribution=single)
 
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N3]
+  tables: [T1_N1N2N3]
+  partitions: {N3=[2:3]}
+  tree:
+    Sender(targetFragment=0, exchange=1, distribution=single)
+      TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+
 Fragment#2 correlated
   targetNodes: [N0]
   executionNodes: [N4, N5]
   tables: [T2_N4N5]
   partitions: {N4=[0:2], N5=[1:2]}
-  pruningMetadata: [3=[{0=$cor0.ID}]]
+  pruningMetadata: [3=[{0=$cor1.ID}]]
   tree:
     Sender(targetFragment=0, exchange=2, distribution=single)
       MapHashAggregate
         TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2, 
distribution=random)
-
-Fragment#1
-  targetNodes: [N0]
-  executionNodes: [N3]
-  tables: [T1_N1N2N3]
-  partitions: {N3=[2:3]}
-  tree:
-    Sender(targetFragment=0, exchange=1, distribution=single)
-      TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3, 
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
 ---
diff --git a/modules/sql-engine/src/test/resources/mapping/union.test 
b/modules/sql-engine/src/test/resources/mapping/union.test
index 1ab8915466..f364a29779 100644
--- a/modules/sql-engine/src/test/resources/mapping/union.test
+++ b/modules/sql-engine/src/test/resources/mapping/union.test
@@ -37,14 +37,25 @@ Fragment#0 root
 Fragment#1
   targetNodes: [N1]
   executionNodes: [N1]
-  tables: [T1_N1, T2_N1]
+  remoteFragments: [2]
+  exchangeSourceNodes: {2=[N1]}
+  tables: [T2_N1]
   partitions: {N1=[0:1]}
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
       ColocatedHashAggregate
         UnionAll
-          TableScan(name=PUBLIC.T1_N1, source=2, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
+          Receiver(sourceFragment=2, exchange=2, distribution=affinity[table: 
T2_N1, columns: [ID]])
           TableScan(name=PUBLIC.T2_N1, source=3, partitions=1, 
distribution=affinity[table: T2_N1, columns: [ID]])
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  partitions: {N1=[0:1]}
+  tree:
+    Sender(targetFragment=1, exchange=2, distribution=affinity[table: T2_N1, 
columns: [ID]])
+      TableScan(name=PUBLIC.T1_N1, source=4, partitions=1, 
distribution=affinity[table: T1_N1, columns: [ID]])
 ---
 
 N1
@@ -86,12 +97,23 @@ Fragment#0 root
 Fragment#1
   targetNodes: [N1]
   executionNodes: [N1, N2]
-  tables: [T1_N1N2, T2_N1N2]
+  remoteFragments: [2]
+  exchangeSourceNodes: {2=[N1, N2]}
+  tables: [T2_N1N2]
   partitions: {N1=[0:2], N2=[1:2]}
   tree:
     Sender(targetFragment=0, exchange=1, distribution=single)
       ColocatedHashAggregate
         UnionAll
-          TableScan(name=PUBLIC.T1_N1N2, source=2, partitions=2, 
distribution=affinity[table: T1_N1N2, columns: [ID]])
+          Receiver(sourceFragment=2, exchange=2, distribution=affinity[table: 
T2_N1N2, columns: [ID]])
           TableScan(name=PUBLIC.T2_N1N2, source=3, partitions=2, 
distribution=affinity[table: T2_N1N2, columns: [ID]])
+
+Fragment#2
+  targetNodes: [N1, N2]
+  executionNodes: [N1, N2]
+  tables: [T1_N1N2]
+  partitions: {N1=[0:2], N2=[1:2]}
+  tree:
+    Sender(targetFragment=1, exchange=2, distribution=affinity[table: T2_N1N2, 
columns: [ID]])
+      TableScan(name=PUBLIC.T1_N1N2, source=4, partitions=2, 
distribution=affinity[table: T1_N1N2, columns: [ID]])
 ---

Reply via email to