This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 894e6982322 IGNITE-26521 Sql. Flaky 
ItUnstableTopologyTest.ensureLostOfNodeDoesntCausesQueryToFail (#6958)
894e6982322 is described below

commit 894e69823222649ed856372c8ed22347f1e3454c
Author: Max Zhuravkov <[email protected]>
AuthorDate: Thu Nov 20 13:17:41 2025 +0000

    IGNITE-26521 Sql. Flaky 
ItUnstableTopologyTest.ensureLostOfNodeDoesntCausesQueryToFail (#6958)
---
 .../internal/sql/engine/SqlOperationContext.java   |  23 ++++-
 .../internal/sql/engine/SqlQueryProcessor.java     |   5 +-
 .../internal/sql/engine/exec/ExecutionContext.java |  15 +++-
 .../sql/engine/exec/ExecutionServiceImpl.java      |  98 +++++++++++++++-----
 .../sql/engine/exec/MailboxRegistryImpl.java       |  19 ++--
 .../{MappingService.java => MappedFragments.java}  |  34 ++++---
 .../sql/engine/exec/mapping/MappingService.java    |   3 +-
 .../engine/exec/mapping/MappingServiceImpl.java    |  19 ++--
 .../ignite/internal/sql/engine/exec/rel/Inbox.java |   7 +-
 .../internal/sql/engine/exec/rel/Outbox.java       |   7 +-
 .../sql/engine/message/QueryStartRequest.java      |   4 +
 .../sql/engine/exec/ExecutionServiceImplTest.java  | 100 ++++++++++++++++++---
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   3 +-
 .../exec/mapping/MappingServiceImplTest.java       |  24 ++---
 .../sql/engine/exec/mapping/MappingTestRunner.java |   4 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   3 +-
 .../sql/engine/framework/TestBuilders.java         |   3 +-
 17 files changed, 276 insertions(+), 95 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
index ac0e697ebbf..737270b9ff5 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
@@ -51,6 +51,7 @@ public final class SqlOperationContext {
     private final @Nullable Consumer<QueryTransactionWrapper> txUsedListener;
     private final @Nullable Consumer<Throwable> errorListener;
     private final @Nullable String userName;
+    private final @Nullable Long topologyVersion;
 
     /**
      * Private constructor, used by a builder.
@@ -65,7 +66,8 @@ public final class SqlOperationContext {
             @Nullable String defaultSchemaName,
             @Nullable Consumer<QueryTransactionWrapper> txUsedListener,
             @Nullable Consumer<Throwable> errorListener,
-            @Nullable String userName
+            @Nullable String userName,
+            @Nullable Long topologyVersion
     ) {
         this.queryId = queryId;
         this.timeZoneId = timeZoneId;
@@ -77,6 +79,7 @@ public final class SqlOperationContext {
         this.txUsedListener = txUsedListener;
         this.errorListener = errorListener;
         this.userName = userName;
+        this.topologyVersion = topologyVersion;
     }
 
     public static Builder builder() {
@@ -130,6 +133,15 @@ public final class SqlOperationContext {
         return txContext;
     }
 
+    /**
+     * Returns topology version with query was mapped on.
+     * 
+     * <p>May be null, if the node is the initiator.
+     */
+    public @Nullable Long topologyVersion() {
+        return topologyVersion;
+    }
+
     /**
      * Notifies context that transaction was used for query execution.
      */
@@ -186,6 +198,7 @@ public final class SqlOperationContext {
         private @Nullable QueryCancel cancel;
         private @Nullable String defaultSchemaName;
         private @Nullable String userName;
+        private @Nullable Long topologyVersion;
 
         public Builder cancel(@Nullable QueryCancel cancel) {
             this.cancel = requireNonNull(cancel);
@@ -237,6 +250,11 @@ public final class SqlOperationContext {
             return this;
         }
 
+        public Builder topologyVersion(@Nullable Long topologyVersion) {
+            this.topologyVersion = topologyVersion;
+            return this;
+        }
+
         /** Creates new context. */
         public SqlOperationContext build() {
             return new SqlOperationContext(
@@ -249,7 +267,8 @@ public final class SqlOperationContext {
                     defaultSchemaName,
                     txUsedListener,
                     errorListener,
-                    userName
+                    userName,
+                    topologyVersion
             );
         }
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 817cda00ec3..4a485c6a54d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -363,6 +363,8 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
         );
 
         logicalTopologyService.addEventListener(mappingService);
+        logicalTopologyService.addEventListener(mailboxRegistry);
+
         placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
mappingService::onPrimaryReplicaExpired);
         // Need to be implemented after 
https://issues.apache.org/jira/browse/IGNITE-23519 Add an event for lease 
Assignments
         // placementDriver.listen(PrimaryReplicaEvent.ASSIGNMENTS_CHANGED, 
mappingService::onPrimaryReplicaAssignment);
@@ -409,8 +411,7 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
 
         queriesViewProvider.init(queryExecutor, prepareSvc);
 
-        clusterSrvc.topologyService().addEventHandler(executionSrvc);
-        clusterSrvc.topologyService().addEventHandler(mailboxRegistry);
+        logicalTopologyService.addEventListener(executionSrvc);
 
         registerService(sqlStatisticManager);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index cbda6b5c819..6e5462c3933 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -110,6 +110,8 @@ public class ExecutionContext<RowT> implements DataContext {
 
     private SharedState sharedState = new SharedState();
 
+    private final @Nullable Long topologyVersion;
+
     /**
      * Constructor.
      *
@@ -126,8 +128,8 @@ public class ExecutionContext<RowT> implements DataContext {
      * @param inBufSize Default execution nodes' internal buffer size. 
Negative value means default value.
      * @param clock The clock to use to get the system time.
      * @param username Authenticated user name or {@code null} for unknown 
user.
+     * @param topologyVersion Topology version the query was mapped on.
      */
-    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public ExecutionContext(
             ExpressionFactory<RowT> expressionFactory,
             QueryTaskExecutor executor,
@@ -142,7 +144,8 @@ public class ExecutionContext<RowT> implements DataContext {
             ZoneId timeZoneId,
             int inBufSize,
             Clock clock,
-            @Nullable String username
+            @Nullable String username,
+            @Nullable Long topologyVersion
     ) {
         this.expressionFactory = expressionFactory;
         this.executor = executor;
@@ -157,6 +160,7 @@ public class ExecutionContext<RowT> implements DataContext {
         this.timeZoneId = timeZoneId;
         this.inBufSize = inBufSize < 0 ? Commons.IN_BUFFER_SIZE : inBufSize;
         this.currentUser = username;
+        this.topologyVersion = topologyVersion;
 
         assert this.inBufSize > 0 : this.inBufSize;
 
@@ -314,6 +318,11 @@ public class ExecutionContext<RowT> implements DataContext 
{
         }
     }
 
+    /** Returns the topology version the query was mapped on. */
+    public @Nullable Long topologyVersion() {
+        return topologyVersion;
+    }
+
     /** Gets dynamic parameters by name. */
     private @Nullable Object getParameter(String name) {
         assert name.startsWith("?") : name;
@@ -398,7 +407,7 @@ public class ExecutionContext<RowT> implements DataContext {
                 Throwable unwrappedException = ExceptionUtils.unwrapCause(e);
                 onError.accept(unwrappedException);
 
-                if (unwrappedException instanceof IgniteException 
+                if (unwrappedException instanceof IgniteException
                         || unwrappedException instanceof 
IgniteInternalException
                         || unwrappedException instanceof IgniteCheckedException
                         || unwrappedException instanceof 
IgniteInternalCheckedException
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 11640decc3e..d7718550a4d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -60,6 +60,9 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.components.NodeProperties;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -70,7 +73,6 @@ import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -96,6 +98,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentPrinter;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragment;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragments;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingParameters;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingUtils;
@@ -140,7 +143,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Provide ability to execute SQL query plan and retrieve results of the 
execution.
  */
-public class ExecutionServiceImpl<RowT> implements ExecutionService, 
TopologyEventHandler, Debuggable {
+public class ExecutionServiceImpl<RowT> implements ExecutionService, 
LogicalTopologyEventListener, Debuggable {
     private static final int CACHE_SIZE = 1024;
     private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionServiceImpl.class);
     private static final SqlQueryMessagesFactory FACTORY = new 
SqlQueryMessagesFactory();
@@ -385,7 +388,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     }
 
     private static SqlOperationContext createOperationContext(
-            UUID queryId, ZoneId timeZoneId, Object[] params, HybridTimestamp 
operationTime, @Nullable String username
+            UUID queryId, 
+            ZoneId timeZoneId, 
+            Object[] params, 
+            HybridTimestamp operationTime, 
+            @Nullable String username,
+            @Nullable Long topologyVersion
     ) {
         return SqlOperationContext.builder()
                 .queryId(queryId)
@@ -393,6 +401,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 .timeZoneId(timeZoneId)
                 .operationTime(operationTime)
                 .userName(username)
+                .topologyVersion(topologyVersion)
                 .build();
     }
 
@@ -478,7 +487,9 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 operationContext.timeZoneId(),
                 -1,
                 Clock.systemUTC(),
-                operationContext.userName()
+                operationContext.userName(),
+                // ExecutablePlan use no mapping.
+                null
         );
 
         QueryTransactionContext txContext = operationContext.txContext();
@@ -559,7 +570,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 return new 
IteratorToDataCursorAdapter<>(List.of(res).iterator());
             }
             case MAPPING:
-                CompletableFuture<List<MappedFragment>> mappedFragments;
+                CompletableFuture<MappedFragments> mappedFragments;
                 if (plan.plan() instanceof MultiStepPlan) {
                     QueryTransactionContext txContext = 
operationContext.txContext();
 
@@ -579,13 +590,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
                     mappedFragments = mappingService.map((MultiStepPlan) 
plan.plan(), mappingParameters);
                 } else {
-                    mappedFragments = completedFuture(List.of(
-                            
MappingUtils.createSingleNodeMapping(localNode.name(), plan.plan().getRel())
-                    ));
+                    MappedFragment singleNodeMapping = 
MappingUtils.createSingleNodeMapping(localNode.name(), plan.plan().getRel());
+                    mappedFragments = completedFuture(new 
MappedFragments(List.of(singleNodeMapping), 0));
                 }
 
                 CompletableFuture<Iterator<InternalSqlRow>> fragments0 =
-                        mappedFragments.thenApply(mfs -> 
FragmentPrinter.fragmentsToString(false, mfs))
+                        mappedFragments.thenApply(mfs -> 
FragmentPrinter.fragmentsToString(false, mfs.fragments()))
                                 .thenApply(InternalSqlRowSingleString::new)
                                 .thenApply(InternalSqlRow.class::cast)
                                 .thenApply(List::of)
@@ -686,8 +696,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
     /** {@inheritDoc} */
     @Override
-    public void onDisappeared(InternalClusterNode member) {
-        queryManagerMap.values().forEach(qm -> qm.onNodeLeft(member.name()));
+    public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
+        queryManagerMap.values().forEach(qm -> qm.onNodeLeft(leftNode.name(), 
newTopology.version()));
     }
 
     /** Returns local fragments for the query with given id. */
@@ -702,7 +712,14 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private void submitFragment(InternalClusterNode initiatorNode, 
QueryStartRequest msg) {
         DistributedQueryManager queryManager = 
getOrCreateQueryManager(initiatorNode.name(), msg);
 
-        queryManager.submitFragment(initiatorNode, msg.catalogVersion(), 
msg.root(), msg.fragmentDescription(), msg.txAttributes());
+        queryManager.submitFragment(
+                initiatorNode, 
+                msg.catalogVersion(), 
+                msg.root(), 
+                msg.fragmentDescription(), 
+                msg.txAttributes(),
+                msg.topologyVersion()
+        );
     }
 
     private void handleError(Throwable ex, String nodeName, QueryStartRequest 
msg) {
@@ -714,7 +731,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private DistributedQueryManager getOrCreateQueryManager(String 
coordinatorNodeName, QueryStartRequest msg) {
         return queryManagerMap.computeIfAbsent(new ExecutionId(msg.queryId(), 
msg.executionToken()), key -> {
             SqlOperationContext operationContext = createOperationContext(
-                    key.queryId(), ZoneId.of(msg.timeZoneId()), 
msg.parameters(), msg.operationTime(), msg.username()
+                    key.queryId(), 
+                    ZoneId.of(msg.timeZoneId()), 
+                    msg.parameters(), 
+                    msg.operationTime(), 
+                    msg.username(),
+                    msg.topologyVersion()
             );
 
             return new DistributedQueryManager(key, coordinatorNodeName, 
operationContext);
@@ -887,6 +909,9 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         private volatile Long rootFragmentId = null;
 
+        /** On the initiator this field is assigned when mapping completes. */
+        private volatile @Nullable Long topologyVersion;
+
         private DistributedQueryManager(
                 ExecutionId executionId,
                 String coordinatorNodeName,
@@ -911,9 +936,15 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             } else {
                 this.root = null;
             }
+
+            this.topologyVersion = ctx.topologyVersion();
         }
 
-        private DistributedQueryManager(ExecutionId executionId, String 
coordinatorNodeName, SqlOperationContext ctx) {
+        private DistributedQueryManager(
+                ExecutionId executionId,
+                String coordinatorNodeName, 
+                SqlOperationContext ctx
+        ) {
             this(executionId, coordinatorNodeName, false, ctx);
         }
 
@@ -922,7 +953,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         }
 
         private CompletableFuture<Void> sendFragment(
-                String targetNodeName, String serialisedFragment, 
FragmentDescription desc, TxAttributes txAttributes, int catalogVersion
+                String targetNodeName,
+                String serialisedFragment,
+                FragmentDescription desc, 
+                TxAttributes txAttributes, 
+                int catalogVersion,
+                long topologyVersion
         ) {
             QueryStartRequest request = FACTORY.queryStartRequest()
                     .queryId(executionId.queryId())
@@ -937,6 +973,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     .operationTime(ctx.operationTime())
                     .timestamp(clockService.now())
                     .username(ctx.userName())
+                    .topologyVersion(topologyVersion)
                     .build();
 
             return messageService.send(targetNodeName, request);
@@ -968,7 +1005,11 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             });
         }
 
-        private void onNodeLeft(String nodeName) {
+        private void onNodeLeft(String nodeName, long topologyVersion) {
+            Long mappingTopologyVersion = this.topologyVersion;
+            if (mappingTopologyVersion != null && mappingTopologyVersion > 
topologyVersion) {
+                return; // Ignore outdated event.
+            }
             remoteFragmentInitCompletion.entrySet().stream()
                     .filter(e -> nodeName.equals(e.getKey().nodeName()))
                     .forEach(e -> e.getValue().completeExceptionally(new 
NodeLeftException(nodeName)));
@@ -1018,7 +1059,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         private ExecutionContext<RowT> createContext(
                 InternalClusterNode initiatorNode,
                 FragmentDescription desc,
-                TxAttributes txAttributes
+                TxAttributes txAttributes,
+                @Nullable Long topologyVersion
         ) {
             return new ExecutionContext<>(
                     expressionFactory,
@@ -1034,7 +1076,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     ctx.timeZoneId(),
                     -1,
                     Clock.systemUTC(),
-                    ctx.userName()
+                    ctx.userName(),
+                    topologyVersion
             );
         }
 
@@ -1043,10 +1086,11 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 int catalogVersion,
                 String fragmentString,
                 FragmentDescription desc,
-                TxAttributes txAttributes
+                TxAttributes txAttributes,
+                @Nullable Long topologyVersion
         ) {
             try {
-                ExecutionContext<RowT> context = createContext(initiatorNode, 
desc, txAttributes);
+                ExecutionContext<RowT> context = createContext(initiatorNode, 
desc, txAttributes, topologyVersion);
                 IgniteRel treeRoot = 
relationalTreeFromJsonString(catalogVersion, fragmentString);
 
                 ResolvedDependencies resolvedDependencies = 
dependencyResolver.resolveDependencies(List.of(treeRoot), catalogVersion);
@@ -1099,8 +1143,13 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         private CompletableFuture<AsyncCursor<InternalSqlRow>> sendFragments(
                 InternalTransaction tx,
                 MultiStepPlan multiStepPlan,
-                List<MappedFragment> mappedFragments
+                MappedFragments mappedFragmentList
         ) {
+            long topologyVersion = mappedFragmentList.topologyVersion();
+            this.topologyVersion = topologyVersion;
+
+            List<MappedFragment> mappedFragments = 
mappedFragmentList.fragments();
+
             // we rely on the fact that the very first fragment is a root. 
Otherwise we need to handle
             // the case when a non-root fragment will fail before the root is 
processed.
             assert !nullOrEmpty(mappedFragments) && 
mappedFragments.get(0).fragment().rootFragment()
@@ -1161,7 +1210,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
                 for (String nodeName : mappedFragment.nodes()) {
                     CompletableFuture<Void> resultOfSending =
-                            sendFragment(nodeName, fragment.serialized(), 
fragmentDesc, attributes, multiStepPlan.catalogVersion());
+                            sendFragment(nodeName, 
+                                    fragment.serialized(), 
+                                    fragmentDesc, attributes,
+                                    multiStepPlan.catalogVersion(),
+                                    topologyVersion
+                            );
 
                     resultOfSending.whenComplete((ignored, t) -> {
                         if (t == null) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
index fdc51ca4cfe..9d31088b645 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.sql.engine.exec;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
 import org.apache.ignite.internal.tostring.S;
@@ -32,7 +33,7 @@ import org.apache.ignite.internal.tostring.S;
  * MailboxRegistryImpl.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class MailboxRegistryImpl implements MailboxRegistry, 
TopologyEventHandler {
+public class MailboxRegistryImpl implements MailboxRegistry, 
LogicalTopologyEventListener {
     private static final IgniteLogger LOG = 
Loggers.forClass(MailboxRegistryImpl.class);
 
     private final Map<MailboxKey, CompletableFuture<Outbox<?>>> locals;
@@ -130,15 +131,9 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
 
     /** {@inheritDoc} */
     @Override
-    public void onAppeared(InternalClusterNode member) {
-        // NO-OP
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void onDisappeared(InternalClusterNode member) {
-        locals.values().forEach(fut -> fut.thenAccept(n -> 
n.onNodeLeft(member)));
-        remotes.values().forEach(n -> n.onNodeLeft(member));
+    public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
+        locals.values().forEach(fut -> fut.thenAccept(n -> 
n.onNodeLeft(leftNode, newTopology.version())));
+        remotes.values().forEach(n -> n.onNodeLeft(leftNode, 
newTopology.version()));
     }
 
     private static class MailboxKey {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragments.java
similarity index 60%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragments.java
index dc778cab542..5b1bf07e52b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragments.java
@@ -18,19 +18,29 @@
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 
 /**
- * A service to map multi step plan to an actual topology.
+ * Mapped fragments.
  */
-@FunctionalInterface
-public interface MappingService {
-    /**
-     * Maps given plan to a cluster topology.
-     *
-     * @param multiStepPlan A plan to map.
-     * @return A list of fragments with metadata related to a fragment 
topology.
-     */
-    CompletableFuture<List<MappedFragment>> map(MultiStepPlan multiStepPlan, 
MappingParameters parameters);
+public final class MappedFragments {
+
+    private final long topologyVersion;
+
+    private final List<MappedFragment> fragments;
+
+    /** Constructor. */
+    public MappedFragments(List<MappedFragment> fragments, long 
topologyVersion) {
+        this.fragments = fragments;
+        this.topologyVersion = topologyVersion;
+    }
+
+    /** Cluster topology version these fragments were mapped on. */
+    public long topologyVersion() {
+        return topologyVersion;
+    }
+
+    /** Fragments. */
+    public List<MappedFragment> fragments() {
+        return fragments;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
index dc778cab542..e70f82be55e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 
@@ -32,5 +31,5 @@ public interface MappingService {
      * @param multiStepPlan A plan to map.
      * @return A list of fragments with metadata related to a fragment 
topology.
      */
-    CompletableFuture<List<MappedFragment>> map(MultiStepPlan multiStepPlan, 
MappingParameters parameters);
+    CompletableFuture<MappedFragments> map(MultiStepPlan multiStepPlan, 
MappingParameters parameters);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 9ffa1119238..d66c71762dd 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -143,7 +143,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
     }
 
     @Override
-    public CompletableFuture<List<MappedFragment>> map(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
+    public CompletableFuture<MappedFragments> map(MultiStepPlan multiStepPlan, 
MappingParameters parameters) {
         if (initialTopologyFuture.isDone()) {
             return map0(multiStepPlan, parameters);
         }
@@ -151,7 +151,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
         return initialTopologyFuture.thenComposeAsync(ignore -> 
map0(multiStepPlan, parameters), taskExecutor);
     }
 
-    private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
+    private CompletableFuture<MappedFragments> map0(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
         FragmentsTemplate template = getOrCreateTemplate(multiStepPlan);
 
         boolean mapOnBackups = parameters.mapOnBackups();
@@ -160,7 +160,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
         PartitionPruningMetadata partitionPruningMetadata = 
multiStepPlan.partitionPruningMetadata();
         TopologySnapshot topologySnapshot = topologyHolder.topology();
 
-        CompletableFuture<MappedFragments> mappedFragments;
+        CompletableFuture<MappedFragmentsWithNodes> mappedFragments;
         // TODO: https://issues.apache.org/jira/browse/IGNITE-26465 enable 
cache
         // if (nodeExclusionFilter != null) {
         //     mappedFragments = mapFragments(
@@ -176,7 +176,8 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
                 template, mapOnBackups, 
composeNodeExclusionFilter(topologySnapshot, parameters)
         );
 
-        return mappedFragments.thenApply(frags -> 
applyPartitionPruning(frags.fragments, parameters, partitionPruningMetadata));
+        return mappedFragments.thenApply(frags -> 
applyPartitionPruning(frags.fragments, parameters, partitionPruningMetadata))
+                .thenApply(frags -> new MappedFragments(frags, 
topologySnapshot.version));
     }
 
     private MappingsCacheValue computeMappingCacheKey(
@@ -265,7 +266,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
         }
     }
 
-    private CompletableFuture<MappedFragments> mapFragments(
+    private CompletableFuture<MappedFragmentsWithNodes> mapFragments(
             FragmentsTemplate template,
             boolean mapOnBackups,
             Predicate<String> nodeExclusionFilter
@@ -360,7 +361,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
                 targetNodes.addAll(mappedFragment.nodes());
             }
 
-            return new MappedFragments(mappedFragmentsList, targetNodes);
+            return new MappedFragmentsWithNodes(mappedFragmentsList, 
targetNodes);
         });
     }
 
@@ -454,11 +455,11 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
     }
 
     /** Wraps list of mapped fragments with target node names. */
-    private static class MappedFragments {
+    private static class MappedFragmentsWithNodes {
         final List<MappedFragment> fragments;
         final Set<String> nodes;
 
-        MappedFragments(List<MappedFragment> fragments, Set<String> nodes) {
+        MappedFragmentsWithNodes(List<MappedFragment> fragments, Set<String> 
nodes) {
             this.fragments = fragments;
             this.nodes = nodes;
         }
@@ -470,7 +471,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
         // TODO: https://issues.apache.org/jira/browse/IGNITE-26465 enable 
cache
         // private final CompletableFuture<MappedFragments> mappedFragments;
 
-        MappingsCacheValue(long topologyVersion, IntSet tableOrZoneIds, 
CompletableFuture<MappedFragments> mappedFragments) {
+        MappingsCacheValue(long topologyVersion, IntSet tableOrZoneIds, 
CompletableFuture<MappedFragmentsWithNodes> mappedFragments) {
             this.topologyVersion = topologyVersion;
             this.tableOrZoneIds = tableOrZoneIds;
             // TODO: https://issues.apache.org/jira/browse/IGNITE-26465 enable 
cache
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index 6fdb17cfdf1..3625db32d30 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -392,7 +392,12 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
     }
 
     /** Notifies the inbox that provided node has left the cluster. */
-    public void onNodeLeft(InternalClusterNode node) {
+    public void onNodeLeft(InternalClusterNode node, long version) {
+        Long topologyVersion = context().topologyVersion();
+        if (topologyVersion != null && topologyVersion > version) {
+            return; // Ignore outdated event.
+        }
+
         if (srcNodeNames.contains(node.name())) {
             this.execute(() -> onNodeLeft0(node.name()));
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 7410fadcafc..bd286a1e8aa 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -356,7 +356,12 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
     }
 
     /** Notifies the outbox that provided node has left the cluster. */
-    public void onNodeLeft(InternalClusterNode node) {
+    public void onNodeLeft(InternalClusterNode node, long version) {
+        Long topologyVersion = context().topologyVersion();
+        if (topologyVersion != null && topologyVersion > version) {
+            return; // Ignore outdated event.
+        }
+
         if (node.id().equals(context().originatingNodeId())) {
             this.execute(this::close);
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index a4d058529f2..9b0ae4c693c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -70,4 +70,8 @@ public interface QueryStartRequest extends TimestampAware, 
ExecutionContextAware
     /** Name of user who starts the query. */
     @Nullable
     String username();
+
+    /** The version of the cluster logical topology this query was mapped on. 
*/
+    @Nullable
+    Long topologyVersion();
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 0e5a6c11ce0..ef1e716e09c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -78,7 +79,9 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.catalog.CatalogApplyResult;
 import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.failure.FailureManager;
@@ -580,7 +583,18 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         // start response trigger
         CountDownLatch startResponse = new CountDownLatch(1);
 
+        AtomicLong currentTopologyVersion = new AtomicLong();
+
         nodeNames.stream().map(testCluster::node).forEach(node -> 
node.interceptor((senderNode, msg, original) -> {
+            if (msg instanceof QueryStartRequest) {
+                QueryStartRequest startRequest = (QueryStartRequest) msg;
+                Long topologyVersion = startRequest.topologyVersion();
+                if (topologyVersion == null) {
+                    throw new IllegalStateException("Topology version is 
missing");
+                }
+                currentTopologyVersion.set(topologyVersion);
+            }
+
             if (node.node.name().equals(nodeNames.get(0))) {
                 // On node_1, hang until an exception from another node fails 
the query to make sure that the root fragment does not execute
                 // before other fragments.
@@ -613,7 +627,10 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         CompletableFuture<BatchedResult<InternalSqlRow>> resFut = 
cursor.requestNextAsync(9);
 
         startResponse.await();
-        execService.onDisappeared(firstNode);
+        execService.onNodeLeft(
+                new LogicalNode(firstNode, Map.of()), 
+                new LogicalTopologySnapshot(currentTopologyVersion.get(), 
List.of(), randomUUID())
+        );
 
         nodeFailedLatch.countDown();
 
@@ -999,13 +1016,13 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
      * <p>The sequence of events on real cluster is as follow:<ul>
      * <li>Given: cluster of 3 nodes, distribution zone spans all these 
nodes.</li>
      * <li>Node 1 has been restarted.</li>
-     * <li>Notification of 
org.apache.ignite.internal.network.TopologyEventHandler#onDisappeared handlers 
are delayed on node 2 (due to
-     * metastorage lagging or whatever reason).</li>
+     * <li>Notification of 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener#onNodeLeft
 
+     * handlers are delayed on node 2 (due to metastorage lagging or whatever 
reason).</li>
      * <li>Query started from node 1.</li>
      * <li>Root fragment processed locally, QueryBatchRequest came to node 2 
before QueryStartRequest. This step
      * is crucial since it puts not completed future to mailbox registry
      * 
(org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl#locals).</li>
-     * <li>TopologyEventHandler's are notified on node 2. This step
+     * <li>LogicalTopologyEventListener's are notified on node 2. This step
      * causes onNodeLeft handler to be chained to the future from previous 
step. QueryStartRequest came to node 2. Query fragment is created
      * an immediately closed by onNodeLeft handler.</li>
      * </ul>
@@ -1042,12 +1059,15 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                     throw new RuntimeException(e);
                 }
 
-                InternalClusterNode sameNameDifferentIdNode = 
clusterNode(nodeNames.get(0));
-                // Fire NODE_LEFT event on map-node in question. This event 
contains
-                // cluster node with consistent ID equals to ID of 
node-initiator, but
-                // different volatile id. This emulates situation, when 
request prepared
-                // on newer topology outruns event processing from previous 
topology change.
-                
testCluster.node(nodeNames.get(2)).notifyNodeLeft(sameNameDifferentIdNode);
+                QueryStartRequest queryStartRequest = (QueryStartRequest) msg;
+                Long topologyVersion = queryStartRequest.topologyVersion();
+                if (topologyVersion == null) {
+                    throw new IllegalStateException("Topology version is 
missing");
+                }
+
+                InternalClusterNode node = clusterNode(nodeNames.get(2));
+                // This emulates situation, when request prepared on newer 
topology outruns event processing from previous topology change.
+                testCluster.node(nodeNames.get(2)).notifyNodeLeft(node, 
topologyVersion - 1);
             }
 
             return nullCompletedFuture();
@@ -1060,6 +1080,61 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         await(await(cursorFuture).requestNextAsync(100));
     }
 
+    /**
+     * Tests scenario when nodes receive a node left event from the previous 
topology.
+     */
+    @Test
+    void outdatedNodeLeftEventDoesntCauseQueryToHangAllNodes() {
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", createContext());
+
+        AtomicLong currentTopologyVersion = new AtomicLong(); 
+        CountDownLatch latch = new CountDownLatch(1);
+
+        // Triggers node left events with previous topology version for every 
node in the cluster.
+        for (String nodeName : nodeNames) {
+            testCluster.node(nodeName).interceptor((senderNode, msg, original) 
-> {
+                original.onMessage(senderNode, msg);
+
+                if (msg instanceof QueryStartRequest
+                        // Fragment without target is a root.
+                        && ((QueryStartRequest) 
msg).fragmentDescription().target() == null) {
+
+                    QueryStartRequest queryStartRequest = (QueryStartRequest) 
msg;
+                    Long topologyVersion = queryStartRequest.topologyVersion();
+                    if (topologyVersion == null) {
+                        throw new IllegalStateException("Topology version is 
missing");
+                    }
+
+                    currentTopologyVersion.set(topologyVersion);
+                    latch.countDown();
+                }
+
+                if (!(msg instanceof QueryStartRequest)) {
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Test was interrupted", e);
+                    }
+                    long previousVersion = currentTopologyVersion.get() - 1;
+
+                    InternalClusterNode node = clusterNode(nodeName);
+                    // This emulates situation, when request prepared on newer 
topology receive an event from previous topology change.
+                    testCluster.node(nodeName).notifyNodeLeft(node, 
previousVersion);
+
+                    return nullCompletedFuture();
+                } else {
+                    return nullCompletedFuture();
+                }
+            });
+        }
+
+        SqlOperationContext ctx = createContext();
+
+        CompletableFuture<AsyncDataCursor<InternalSqlRow>> cursorFuture = 
executionServices.get(0).executePlan(plan, ctx);
+        // Request must not hung.
+        await(await(cursorFuture).requestNextAsync(100));
+    }
+
     @ParameterizedTest
     @MethodSource("txTypes")
     public void transactionRollbackOnError(NoOpTransaction tx) {
@@ -1311,8 +1386,9 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 this.mailboxRegistry = mailboxRegistry;
             }
 
-            public void notifyNodeLeft(InternalClusterNode node) {
-                mailboxRegistry.onDisappeared(node);
+            public void notifyNodeLeft(InternalClusterNode node, long 
topologyVersion) {
+                LogicalTopologySnapshot newTopology = new 
LogicalTopologySnapshot(topologyVersion, Set.of(), randomUUID());
+                mailboxRegistry.onNodeLeft(new LogicalNode(node, Map.of()), 
newTopology);
             }
 
             public void dataset(List<Object[]> dataset) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 1a335c42500..33809cd91e8 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -131,7 +131,8 @@ public class RuntimeSortedIndexTest extends 
IgniteAbstractTest {
                         SqlCommon.DEFAULT_TIME_ZONE_ID,
                         -1,
                         Clock.systemUTC(),
-                        null
+                        null,
+                        1L
                 ),
                 RelCollations.of(ImmutableIntList.copyOf(idxCols)),
                 (o1, o2) -> {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index 3ad5c3ece74..cd59c69b8d1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -155,8 +155,8 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         mappingService.onTopologyLeap(logicalTopology.getLogicalTopology());
 
-        List<MappedFragment> defaultMapping = await(mappingService.map(PLAN, 
PARAMS));
-        List<MappedFragment> mappingOnBackups = await(mappingService.map(PLAN, 
MappingParameters.MAP_ON_BACKUPS));
+        MappedFragments defaultMapping = await(mappingService.map(PLAN, 
PARAMS));
+        MappedFragments mappingOnBackups = await(mappingService.map(PLAN, 
MappingParameters.MAP_ON_BACKUPS));
 
         verify(execProvider, times(2)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
 
@@ -175,7 +175,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         MappingServiceImpl mappingService = 
createMappingServiceNoCache(localNodeName, List.of(localNodeName));
 
-        CompletableFuture<List<MappedFragment>> mappingFuture = 
mappingService.map(PLAN, PARAMS);
+        CompletableFuture<MappedFragments> mappingFuture = 
mappingService.map(PLAN, PARAMS);
 
         assertThat(mappingFuture, willSucceedFast());
     }
@@ -187,11 +187,11 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         MappingService service = createMappingService(localNodeName, 
nodeNames, 100);
 
-        List<MappedFragment> defaultMapping = 
await(service.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+        MappedFragments defaultMapping = 
await(service.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
 
-        assertThat(defaultMapping, hasSize(2));
+        assertThat(defaultMapping.fragments(), hasSize(2));
 
-        MappedFragment leafFragment = defaultMapping.stream()
+        MappedFragment leafFragment = defaultMapping.fragments().stream()
                 .filter(fragment -> !fragment.fragment().rootFragment())
                 .findFirst()
                 .orElseThrow();
@@ -201,11 +201,11 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         String nodeToExclude = leafFragment.nodes().get(0);
 
         MappingParameters params = MappingParameters.create(new Object[0], 
false, nodeToExclude::equals);
-        List<MappedFragment> mappingWithExclusion = 
await(service.map(PLAN_WITH_SYSTEM_VIEW, params));
+        MappedFragments mappingWithExclusion = 
await(service.map(PLAN_WITH_SYSTEM_VIEW, params));
 
         assertNotSame(defaultMapping, mappingWithExclusion);
 
-        for (MappedFragment fragment : mappingWithExclusion) {
+        for (MappedFragment fragment : mappingWithExclusion.fragments()) {
             assertThat(nodeToExclude, not(in(fragment.nodes())));
         }
     }
@@ -233,8 +233,8 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         mappingService.onTopologyLeap(logicalTopology.getLogicalTopology());
 
-        List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN, 
PARAMS));
-        List<MappedFragment> sysViewMapping = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+        MappedFragments tableOnlyMapping = await(mappingService.map(PLAN, 
PARAMS));
+        MappedFragments sysViewMapping = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
 
         verify(execProvider, times(1)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
         verify(execProvider, times(1)).forSystemView(any());
@@ -297,7 +297,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
                 Runnable::run
         ));
 
-        List<MappedFragment> mappedFragments = await(mappingService.map(PLAN, 
PARAMS));
+        MappedFragments mappedFragments = await(mappingService.map(PLAN, 
PARAMS));
         verify(execProvider, times(1)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
 
         // Simulate expiration of the primary replica for non-mapped table - 
the cache entry should not be invalidated.
@@ -349,7 +349,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         mappingService.onTopologyLeap(logicalTopology.getLogicalTopology());
 
-        List<MappedFragment> mappedFragments = await(mappingService.map(PLAN, 
PARAMS));
+        MappedFragments mappedFragments = await(mappingService.map(PLAN, 
PARAMS));
         verify(execProvider, times(1)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
 
         // Simulate expiration of the primary replica for non-mapped table - 
the cache entry should not be invalidated.
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
index 1c54e7fa6f3..a42c974f784 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -237,7 +237,7 @@ final class MappingTestRunner {
 
         mappingService.onTopologyLeap(snapshot);
 
-        List<MappedFragment> mappedFragments;
+        MappedFragments mappedFragments;
 
         try {
             mappedFragments = await(mappingService.map(plan, 
readFromPrimaryOnly
@@ -256,7 +256,7 @@ final class MappingTestRunner {
             throw new IllegalStateException("Mapped fragments");
         }
 
-        return FragmentPrinter.fragmentsToString(true, mappedFragments);
+        return FragmentPrinter.fragmentsToString(true, 
mappedFragments.fragments());
     }
 
     static class TestCaseDef {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index ed8a167f6e2..bba84a5eab4 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -158,7 +158,8 @@ public abstract class AbstractExecutionTest<T> extends 
IgniteAbstractTest {
                 SqlCommon.DEFAULT_TIME_ZONE_ID,
                 bufferSize,
                 Clock.systemUTC(),
-                null
+                null,
+                1L
         );
 
         contexts.add(executionContext);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 2db3f33f24c..318839e179a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -590,7 +590,8 @@ public class TestBuilders {
                     zoneId,
                     -1,
                     clock,
-                    null
+                    null,
+                    1L
             );
         }
     }

Reply via email to