This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e247ad1047 IGNITE-22768 Sql. Worker node left the cluster before 
fragment has been sent (#4697)
e247ad1047 is described below

commit e247ad1047d53e126b5b8786da03ef3e4463bc65
Author: korlov42 <[email protected]>
AuthorDate: Thu Nov 14 15:16:35 2024 +0200

    IGNITE-22768 Sql. Worker node left the cluster before fragment has been 
sent (#4697)
---
 .../sql/engine/ItUnstableTopologyTest.java         |  92 ++++++++
 .../internal/lang/SqlExceptionMapperUtil.java      |  12 +
 .../ignite/internal/sql/engine/QueryCancel.java    |  21 +-
 .../internal/sql/engine/SqlOperationContext.java   |  17 ++
 .../internal/sql/engine/exec/ExecutionService.java |  11 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      | 245 +++++++++------------
 .../exec/fsm/CursorInitializationPhaseHandler.java |  81 ++++---
 .../sql/engine/exec/fsm/QueryExecutionProgram.java |  44 +++-
 .../sql/engine/exec/fsm/QueryExecutor.java         |   2 +-
 .../exec/fsm/ScriptItemExecutionProgram.java       |   6 +-
 .../MappingException.java}                         |  19 +-
 .../sql/engine/exec/mapping/MappingParameters.java |  28 ++-
 .../engine/exec/mapping/MappingServiceImpl.java    |  71 +++---
 .../mapping/largecluster/LargeClusterFactory.java  |  30 ++-
 .../mapping/smallcluster/SmallClusterFactory.java  |  31 ++-
 .../sql/engine/message/MessageService.java         |   3 +
 .../sql/engine/message/MessageServiceImpl.java     |  13 +-
 .../UnknownNodeException.java}                     |  28 ++-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  89 +++-----
 .../sql/engine/exec/QueryRecoveryTest.java         | 231 +++++++++++++++++++
 .../mapping/ExecutionTargetFactorySelfTest.java    |  32 ++-
 .../exec/mapping/MappingServiceImplTest.java       |  47 +++-
 22 files changed, 808 insertions(+), 345 deletions(-)

diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItUnstableTopologyTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItUnstableTopologyTest.java
new file mode 100644
index 0000000000..73bfb23b42
--- /dev/null
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItUnstableTopologyTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.matches;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to make sure sql engine can recover execution when run on unstable 
topology.
+ */
+public class ItUnstableTopologyTest extends BaseSqlIntegrationTest {
+    private static final String DATA_NODE_BOOTSTRAP_CFG_TEMPLATE = "ignite {\n"
+            + "  network: {\n"
+            + "    port: {},\n"
+            + "    nodeFinder: {\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  clientConnector: { port:{} },\n"
+            + "  nodeAttributes: {\n"
+            + "    nodeAttributes: {role: {attribute: \"data\"}}\n"
+            + "  },\n"
+            + "  rest.port: {}\n"
+            + "}";
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @AfterEach
+    public void dropTables() {
+        dropAllTables();
+        dropAllZonesExceptDefaultOne();
+    }
+
+    @Test
+    public void ensureLostOfNodeDoesntCausesQueryToFail() {
+        CLUSTER.startNode(1, DATA_NODE_BOOTSTRAP_CFG_TEMPLATE);
+        CLUSTER.startNode(2, DATA_NODE_BOOTSTRAP_CFG_TEMPLATE);
+        CLUSTER.startNode(3, DATA_NODE_BOOTSTRAP_CFG_TEMPLATE);
+
+        sql("CREATE ZONE my_zone WITH" 
+                + " partitions = 1," 
+                + " replicas = 3," 
+                + " storage_profiles = 'default'," 
+                + " data_nodes_filter='$[?(@.role == \"data\")]'");
+
+        sql("CREATE TABLE my_table (id INT PRIMARY KEY, val INT) ZONE my_zone 
STORAGE PROFILE 'default'");
+        assertQuery("INSERT INTO my_table SELECT x, x FROM system_range(1, 
1000)")
+                .returns(1000L)
+                .check();
+
+        Ignite gateway = CLUSTER.node(0); 
+
+        assertQuery(gateway, "SELECT count(*) FROM my_table WHERE val > -1")
+                .matches(matches(".*TableScan.*"))
+                .returns(1000L)
+                .check();
+
+        // The choice of node to stop depends on current mapping algorithm 
which sorts
+        // nodes by name and chooses the first one among all options, 
therefore we stop
+        // 1th node -- the first data node. This invariant is fragile, but 
without
+        // EXPLAIN MAPPING FOR <query> it seems there is no other options to 
derive
+        // information about execution nodes for particular query.
+        CLUSTER.stopNode(1);
+
+        assertQuery(gateway, "SELECT count(*) FROM my_table WHERE val > -1")
+                .matches(matches(".*TableScan.*"))
+                .returns(1000L)
+                .check();
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
index 33be23771a..c580a84893 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
@@ -20,8 +20,12 @@ package org.apache.ignite.internal.lang;
 import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingException;
+import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.CursorClosedException;
 import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.lang.TraceableException;
 import org.apache.ignite.sql.SqlException;
 
@@ -48,6 +52,14 @@ public class SqlExceptionMapperUtil {
      * @return Public exception.
      */
     public static Throwable mapToPublicSqlException(Throwable origin) {
+        Throwable unwrapped = ExceptionUtils.unwrapCause(origin);
+        if (unwrapped instanceof MappingException) {
+            return new SqlException(Sql.MAPPING_ERR, unwrapped.getMessage());
+        }
+        if (unwrapped instanceof UnknownNodeException) {
+            return new SqlException(Common.NODE_LEFT_ERR, "Node left the 
cluster. Node: " + ((UnknownNodeException) unwrapped).nodeName());
+        }
+
         Throwable e = mapToPublicException(origin);
         if (e instanceof Error) {
             return e;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
index 1b1fb5ab66..9154c575a8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
@@ -43,11 +43,7 @@ public class QueryCancel {
 
         state.thenAccept(reason -> clo.cancel(reason == Reason.TIMEOUT));
 
-        if (state.isDone()) {
-            Reason reason = state.join();
-
-            throwException(reason);
-        }
+        throwIfCancelled();
     }
 
     /**
@@ -62,11 +58,7 @@ public class QueryCancel {
     public void attach(QueryCancel another) throws QueryCancelledException {
         state.thenAccept(another.state::complete);
 
-        if (state.isDone()) {
-            Reason reason = state.join();
-
-            throwException(reason);
-        }
+        throwIfCancelled();
     }
 
     /**
@@ -92,7 +84,14 @@ public class QueryCancel {
         return state.isDone();
     }
 
-    private static void throwException(Reason reason) {
+    /** Throws {@link QueryCancelledException} If operation has been already 
cancelled.*/
+    public void throwIfCancelled() throws QueryCancelledException {
+        if (!state.isDone()) {
+            return;
+        }
+
+        Reason reason = state.join();
+
         throw new QueryCancelledException(
                 reason == Reason.TIMEOUT
                         ? QueryCancelledException.TIMEOUT_MSG
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
index f3e02aa05e..180207e247 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
@@ -20,8 +20,11 @@ package org.apache.ignite.internal.sql.engine;
 import static java.util.Objects.requireNonNull;
 
 import java.time.ZoneId;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
@@ -36,6 +39,8 @@ import org.jetbrains.annotations.Nullable;
  * kick-starting of the statement execution.
  */
 public final class SqlOperationContext {
+    private final Set<String> excludedNodes = ConcurrentHashMap.newKeySet();
+
     private final UUID queryId;
     private final ZoneId timeZoneId;
     private final Object[] parameters;
@@ -146,6 +151,18 @@ public final class SqlOperationContext {
         return operationTime;
     }
 
+    /** Updates the {@link #nodeExclusionFilter()} with given node. */
+    public void excludeNode(String nodeName) {
+        excludedNodes.add(nodeName);
+    }
+
+    /** Returns the predicate to exclude nodes from mapping, or {@code null} 
if all nodes must be used. */
+    public @Nullable Predicate<String> nodeExclusionFilter() {
+        Set<String> excludedNodes = Set.copyOf(this.excludedNodes);
+
+        return excludedNodes.isEmpty() ? null : excludedNodes::contains;
+    }
+
     /**
      * Query context builder.
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
index e62e4e576c..5c08a895d6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
@@ -25,7 +26,15 @@ import 
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
  * SQL query plan execution interface.
  */
 public interface ExecutionService extends LifecycleAware {
-    AsyncDataCursor<InternalSqlRow> executePlan(
+    /**
+     * Executes the given plan.
+     *
+     * @param plan Plan to execute.
+     * @param operationContext Context of operation.
+     * @return Future that will be completed when cursor is successfully 
initialized, implying for distributed plans all fragments have been
+     *         sent successfully.
+     */
+    CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
             QueryPlan plan, SqlOperationContext operationContext
     );
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 305c8ce4e6..749583e28d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.sql.engine.externalize.RelJsonReader.fromJson;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -52,7 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
-import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.configuration.ConfigurationChangeException;
@@ -62,7 +64,6 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
-import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.TopologyEventHandler;
@@ -291,7 +292,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         return ddlCmdHnd;
     }
 
-    private AsyncDataCursor<InternalSqlRow> executeQuery(
+    private CompletableFuture<AsyncDataCursor<InternalSqlRow>> executeQuery(
             SqlOperationContext operationContext,
             MultiStepPlan plan
     ) {
@@ -318,8 +319,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         QueryTransactionWrapper txWrapper = 
txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML);
         InternalTransaction tx = txWrapper.unwrap();
 
-        AsyncCursor<InternalSqlRow> dataCursor = queryManager.execute(tx, 
plan);
-
         operationContext.notifyTxUsed(txWrapper);
 
         PrefetchCallback prefetchCallback = 
operationContext.prefetchCallback();
@@ -335,11 +334,15 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             firstPageReady = firstPageReady.thenCompose(none -> 
txWrapper.commitImplicit());
         }
 
-        return new TxAwareAsyncCursor<>(
+        CompletableFuture<Void> firstPageReady0 = firstPageReady;
+
+        Predicate<String> nodeExclusionFilter = 
operationContext.nodeExclusionFilter(); 
+
+        return queryManager.execute(tx, plan, 
nodeExclusionFilter).thenApply(dataCursor -> new TxAwareAsyncCursor<>(
                 txWrapper,
                 dataCursor,
-                firstPageReady
-        );
+                firstPageReady0
+        ));
     }
 
     private static SqlOperationContext createOperationContext(
@@ -365,7 +368,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     /** {@inheritDoc} */
     @Override
     @SuppressWarnings("CastConflictsWithInstanceof") // IDEA incorrectly 
highlights casts in EXPLAIN and DDL branches
-    public AsyncDataCursor<InternalSqlRow> executePlan(
+    public CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
             QueryPlan plan, SqlOperationContext operationContext
     ) {
         SqlQueryType queryType = plan.type();
@@ -374,16 +377,16 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             case DML:
             case QUERY:
                 if (plan instanceof ExecutablePlan) {
-                    return executeExecutablePlan(operationContext, 
(ExecutablePlan) plan);
+                    return 
completedFuture(executeExecutablePlan(operationContext, (ExecutablePlan) plan));
                 }
 
                 assert plan instanceof MultiStepPlan : plan.getClass();
 
                 return executeQuery(operationContext, (MultiStepPlan) plan);
             case EXPLAIN:
-                return executeExplain(operationContext, (ExplainPlan) plan);
+                return completedFuture(executeExplain(operationContext, 
(ExplainPlan) plan));
             case DDL:
-                return executeDdl(operationContext, (DdlPlan) plan);
+                return completedFuture(executeDdl(operationContext, (DdlPlan) 
plan));
 
             default:
                 throw new AssertionError("Unexpected query type: " + plan);
@@ -972,79 +975,26 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             }
         }
 
-        private AsyncCursor<InternalSqlRow> execute(InternalTransaction tx, 
MultiStepPlan multiStepPlan) {
+        private CompletableFuture<AsyncCursor<InternalSqlRow>> execute(
+                InternalTransaction tx,
+                MultiStepPlan multiStepPlan,
+                @Nullable Predicate<String> nodeExclusionFilter
+        ) {
             assert root != null;
 
-            // Query has already been cancelled, return immediately.
-            if (cancelled.get()) {
-                return new AsyncCursor<>() {
-                    @Override
-                    public CompletableFuture<BatchedResult<InternalSqlRow>> 
requestNextAsync(int rows) {
-                        Throwable t = 
SqlExceptionMapperUtil.mapToPublicSqlException(new QueryCancelledException());
-                        return CompletableFuture.failedFuture(t);
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> closeAsync() {
-                        return DistributedQueryManager.this.cancelFut;
-                    }
-                };
-            }
-
             boolean mapOnBackups = tx.isReadOnly();
-            MappingParameters mappingParameters = 
MappingParameters.create(ctx.parameters(), mapOnBackups);
+            MappingParameters mappingParameters = 
MappingParameters.create(ctx.parameters(), mapOnBackups, nodeExclusionFilter);
 
-            mappingService.map(multiStepPlan, 
mappingParameters).whenCompleteAsync((mappedFragments, mappingErr) -> {
-                if (mappingErr != null) {
-                    if (!root.completeExceptionally(mappingErr)) {
-                        root.thenAccept(root -> root.onError(mappingErr));
-                    }
-                    return;
-                }
-
-                try {
-                    sendFragments(tx, multiStepPlan, mappedFragments);
-                } catch (Throwable t) {
-                    LOG.warn("Unexpected exception during query 
initialization", t);
-
-                    if (!root.completeExceptionally(t)) {
-                        root.thenAccept(root -> root.onError(t));
-                    }
-                }
-            }, taskExecutor);
-
-            return new AsyncCursor<>() {
-                @Override
-                public CompletableFuture<BatchedResult<InternalSqlRow>> 
requestNextAsync(int rows) {
-                    return root.thenCompose(cur -> {
-                        CompletableFuture<BatchedResult<InternalSqlRow>> fut = 
cur.requestNextAsync(rows);
-
-                        fut.thenAccept(batch -> {
-                            if (!batch.hasMore()) {
-                                DistributedQueryManager.this.close();
-                            }
-                        });
-
-                        return fut;
-                    });
-                }
-
-                @Override
-                public CompletableFuture<Void> closeAsync() {
-                    return root.handle((ignored, ex) -> {
-                        if (ex != null) {
-                            // cancellation should be triggered by listener of 
exceptional
-                            // completion of `root` future, thus let's just 
return a result here
-                            return DistributedQueryManager.this.cancelFut;
-                        }
-
-                        return DistributedQueryManager.this.close();
-                    }).thenCompose(Function.identity());
-                }
-            };
+            return mappingService.map(multiStepPlan, mappingParameters)
+                    .thenComposeAsync(mappedFragments -> sendFragments(tx, 
multiStepPlan, mappedFragments), taskExecutor)
+                    .thenApply(this::wrapRootNode);
         }
 
-        private void sendFragments(InternalTransaction tx, MultiStepPlan 
multiStepPlan, List<MappedFragment> mappedFragments) {
+        private CompletableFuture<AsyncCursor<InternalSqlRow>> sendFragments(
+                InternalTransaction tx,
+                MultiStepPlan multiStepPlan,
+                List<MappedFragment> mappedFragments
+        ) {
             // we rely on the fact that the very first fragment is a root. 
Otherwise we need to handle
             // the case when a non-root fragment will fail before the root is 
processed.
             assert !nullOrEmpty(mappedFragments) && 
mappedFragments.get(0).fragment().rootFragment()
@@ -1061,8 +1011,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 QueryCancel queryCancel = ctx.cancel();
 
                 // skipping initialization if cancel has already been triggered
-                if (queryCancel != null && queryCancel.isCancelled()) {
-                    return;
+                if (queryCancel != null) {
+                    queryCancel.throwIfCancelled();
                 }
 
                 // then let's register all remote fragment's initialization 
futures. This need
@@ -1107,62 +1057,51 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     CompletableFuture<Void> resultOfSending =
                             sendFragment(nodeName, fragment.serialized(), 
fragmentDesc, attributes, multiStepPlan.catalogVersion());
 
-                    resultsOfFragmentSending.add(
-                            resultOfSending.handle((ignored, t) -> {
-                                if (t == null) {
-                                    return null;
-                                }
-
-                                // if we were unable to send a request, then 
no need
-                                // to wait for the remote node to complete 
initialization
-
-                                CompletableFuture<?> completionFuture = 
remoteFragmentInitCompletion.get(
-                                        new RemoteFragmentKey(nodeName, 
fragment.fragmentId())
-                                );
-
-                                if (completionFuture != null) {
-                                    completionFuture.complete(null);
-                                }
-
-                                throw ExceptionUtils.withCause(
-                                        t instanceof NodeLeftException ? 
NodeLeftException::new : IgniteInternalException::new,
-                                        INTERNAL_ERR,
-                                        format("Unable to send fragment 
[targetNode={}, fragmentId={}, cause={}]",
-                                                nodeName, 
fragment.fragmentId(), t.getMessage()), t
-                                );
-                            })
-                    );
+                    resultOfSending.whenComplete((ignored, t) -> {
+                        if (t == null) {
+                            return;
+                        }
+
+                        // if we were unable to send a request, then no need
+                        // to wait for the remote node to complete 
initialization
+
+                        CompletableFuture<?> completionFuture = 
remoteFragmentInitCompletion.get(
+                                new RemoteFragmentKey(nodeName, 
fragment.fragmentId())
+                        );
+
+                        if (completionFuture != null) {
+                            completionFuture.complete(null);
+                        }
+                    });
+
+                    resultsOfFragmentSending.add(resultOfSending);
                 }
             }
 
-            CompletableFutures.allOf(resultsOfFragmentSending)
+            return CompletableFutures.allOf(resultsOfFragmentSending)
                     .handle((ignoredVal, ignoredTh) -> {
                         if (ignoredTh == null) {
-                            return null;
+                            return root;
                         }
 
-                        Throwable firstFoundError = null;
+                        Throwable error = 
deriveExceptionFromListOfFutures(resultsOfFragmentSending);
 
-                        for (CompletableFuture<?> fut : 
resultsOfFragmentSending) {
-                            if (fut.isCompletedExceptionally()) {
-                                // this is non blocking join() because we are 
inside of CompletableFuture.allOf call
-                                Throwable fromFuture = fut.handle((ignored, 
ex) -> ex).join();
+                        assert error != null;
 
-                                if (firstFoundError == null) {
-                                    firstFoundError = fromFuture;
-                                } else {
-                                    firstFoundError.addSuppressed(fromFuture);
-                                }
-                            }
-                        }
+                        return 
CompletableFutures.allOf(remoteFragmentInitCompletion.values())
+                                .thenCompose(none -> {
+                                    if (!root.completeExceptionally(error)) {
+                                        root.thenAccept(root -> 
root.onError(error));
 
-                        Throwable error = firstFoundError;
-                        if (!root.completeExceptionally(error)) {
-                            root.thenAccept(root -> root.onError(error));
-                        }
+                                        close(QueryCompletionReason.ERROR);
+                                    }
 
-                        return null;
-                    });
+                                    return cancelFut
+                                            .thenRun(() -> sneakyThrow(error));
+                                })
+                                .thenCompose(none -> root);
+                    })
+                    .thenCompose(Commons::cast);
         }
 
         private void enlistPartitions(MappedFragment mappedFragment, 
InternalTransaction tx) {
@@ -1258,17 +1197,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
                 queryManagerMap.remove(ctx.queryId());
 
-                QueryCancel cancelHandler = ctx.cancel();
-
-                // Query cancel runs only at the coordinator node.
-                if (cancelHandler != null) {
-                    try {
-                        cancelHandler.cancel();
-                    } catch (Exception th) {
-                        LOG.debug("Exception raised while cancel", th);
-                    }
-                }
-
                 cancelFut.complete(null);
             }).thenRun(() -> localFragments.forEach(f -> 
f.context().cancel()));
 
@@ -1366,6 +1294,49 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
             return nullCompletedFuture();
         }
+
+        private AsyncCursor<InternalSqlRow> 
wrapRootNode(AsyncCursor<InternalSqlRow> cursor) {
+            return new AsyncCursor<>() {
+                @Override
+                public CompletableFuture<BatchedResult<InternalSqlRow>> 
requestNextAsync(int rows) {
+                    CompletableFuture<BatchedResult<InternalSqlRow>> fut = 
cursor.requestNextAsync(rows);
+
+                    fut.thenAccept(batch -> {
+                        if (!batch.hasMore()) {
+                            DistributedQueryManager.this.close();
+                        }
+                    });
+
+                    return fut;
+                }
+
+                @Override
+                public CompletableFuture<Void> closeAsync() {
+                    return DistributedQueryManager.this.close();
+                }
+            };
+        }
+    }
+
+    private static @Nullable Throwable 
deriveExceptionFromListOfFutures(List<CompletableFuture<?>> futures) {
+        Throwable firstFoundError = null;
+
+        for (CompletableFuture<?> fut : futures) {
+            assert fut.isDone();
+
+            if (fut.isCompletedExceptionally()) {
+                // all futures are expected to be completed by this point
+                Throwable fromFuture = fut.handle((ignored, ex) -> ex).join();
+
+                if (firstFoundError == null) {
+                    firstFoundError = fromFuture;
+                } else {
+                    firstFoundError.addSuppressed(fromFuture);
+                }
+            }
+        }
+
+        return firstFoundError;
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
index c38b9e70fe..b5eed6a5db 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.sql.engine.exec.fsm;
 
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 
@@ -41,53 +42,51 @@ class CursorInitializationPhaseHandler implements 
ExecutionPhaseHandler {
         assert plan != null;
         assert context != null;
 
-        AsyncDataCursor<InternalSqlRow> dataCursor = 
query.executor.executePlan(context, plan);
-
         SqlQueryType queryType = plan.type();
-
         PrefetchCallback prefetchCallback = context.prefetchCallback();
 
         assert prefetchCallback != null;
 
-        AsyncSqlCursorImpl<InternalSqlRow> cursor = new AsyncSqlCursorImpl<>(
-                queryType,
-                plan.metadata(),
-                dataCursor,
-                query.nextCursorFuture
-        );
-
-        query.cursor = cursor;
-
-        QueryTransactionContext txContext = query.txContext;
-
-        assert txContext != null;
-
-        if (queryType == SqlQueryType.QUERY) {
-            if (txContext.explicitTx() == null) {
-                // TODO: IGNITE-23604
-                // implicit transaction started by InternalTable doesn't 
update observableTimeTracker. At
-                // this point we don't know whether tx was started by 
InternalTable or ExecutionService, thus
-                // let's update tracker explicitly to preserve consistency
-                txContext.updateObservableTime(query.executor.clockNow());
-            }
-
-            // preserve lazy execution for statements that only reads
-            return Result.completed();
-        }
-
-        // for other types let's wait for the first page to make sure premature
-        // close of the cursor won't cancel an entire operation
-        CompletableFuture<Void> awaitFuture = cursor.onFirstPageReady()
-                .thenApply(none -> {
-                    if (txContext.explicitTx() == null) {
-                        // TODO: IGNITE-23604
-                        // implicit transaction started by InternalTable 
doesn't update observableTimeTracker. At
-                        // this point we don't know whether tx was started by 
InternalTable or ExecutionService, thus
-                        // let's update tracker explicitly to preserve 
consistency
-                        
txContext.updateObservableTime(query.executor.clockNow());
+        CompletableFuture<Void> awaitFuture = 
query.executor.executePlan(context, plan)
+                .thenCompose(dataCursor -> {
+                    AsyncSqlCursorImpl<InternalSqlRow> cursor = new 
AsyncSqlCursorImpl<>(
+                            queryType,
+                            plan.metadata(),
+                            dataCursor,
+                            query.nextCursorFuture
+                    );
+
+                    query.cursor = cursor;
+
+                    QueryTransactionContext txContext = query.txContext;
+
+                    assert txContext != null;
+
+                    if (queryType == SqlQueryType.QUERY) {
+                        if (txContext.explicitTx() == null) {
+                            // TODO: IGNITE-23604
+                            // implicit transaction started by InternalTable 
doesn't update observableTimeTracker. At
+                            // this point we don't know whether tx was started 
by InternalTable or ExecutionService, thus
+                            // let's update tracker explicitly to preserve 
consistency
+                            
txContext.updateObservableTime(query.executor.clockNow());
+                        }
+
+                        // preserve lazy execution for statements that only 
reads
+                        return nullCompletedFuture();
                     }
 
-                    return null;
+                    // for other types let's wait for the first page to make 
sure premature
+                    // close of the cursor won't cancel an entire operation
+                    return cursor.onFirstPageReady()
+                            .thenRun(() -> {
+                                if (txContext.explicitTx() == null) {
+                                    // TODO: IGNITE-23604
+                                    // implicit transaction started by 
InternalTable doesn't update observableTimeTracker. At
+                                    // this point we don't know whether tx was 
started by InternalTable or ExecutionService, thus
+                                    // let's update tracker explicitly to 
preserve consistency
+                                    
txContext.updateObservableTime(query.executor.clockNow());
+                                }
+                            });
                 });
 
         return Result.proceedAfter(awaitFuture);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index f54f7e8a71..f1da01abc6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.sql.engine.exec.fsm;
 import java.util.List;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
 
 /**
  * Generic execution program, which accepts query string and initialization 
parameters as input, and returns cursor as result.
@@ -58,11 +60,43 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
                 TRANSITIONS,
                 phase -> phase == ExecutionPhase.EXECUTING,
                 query -> query.cursor,
-                (query, throwable) -> {
-                    query.onError(throwable);
-
-                    return false;
-                }
+                QueryExecutionProgram::errorHandler
         );
     }
+
+    static boolean errorHandler(Query query, Throwable th) {
+        if (canRecover(query, th)) {
+            SqlOperationContext context = query.operationContext;
+
+            assert context != null;
+
+            // ensured by canRecover() method
+            assert th instanceof UnknownNodeException : th;
+
+            UnknownNodeException exception = (UnknownNodeException) th;
+
+            context.excludeNode(exception.nodeName());
+
+            return true;
+        }
+
+        query.onError(th);
+
+        return false;
+    }
+
+    @SuppressWarnings("SimplifiableIfStatement")
+    private static boolean canRecover(Query query, Throwable th) {
+        if (query.currentPhase() != ExecutionPhase.CURSOR_INITIALIZATION) {
+            return false;
+        }
+
+        // DataCursor unconditionally rolls back transaction in case of error, 
therefore it's not possible
+        // to recover any explicit transaction at the moment
+        if (query.txContext.explicitTx() != null) {
+            return false;
+        }
+
+        return th instanceof UnknownNodeException;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
index a634265a9c..084b35fbe9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
@@ -284,7 +284,7 @@ public class QueryExecutor implements LifecycleAware {
         return clockService.now();
     }
 
-    AsyncDataCursor<InternalSqlRow> executePlan(
+    CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
             SqlOperationContext ctx,
             QueryPlan plan
     ) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
index a7712a2b81..c611c5f59e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
@@ -49,11 +49,7 @@ class ScriptItemExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>>
                 TRANSITIONS,
                 phase -> phase == ExecutionPhase.EXECUTING,
                 query -> query.cursor,
-                (query, throwable) -> {
-                    query.onError(throwable);
-
-                    return false;
-                }
+                QueryExecutionProgram::errorHandler
         );
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingException.java
similarity index 63%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingException.java
index e62e4e576c..ec6a56d78b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingException.java
@@ -15,17 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec;
+package org.apache.ignite.internal.sql.engine.exec.mapping;
 
-import org.apache.ignite.internal.sql.engine.InternalSqlRow;
-import org.apache.ignite.internal.sql.engine.SqlOperationContext;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+/** Generic mapping exception. */
+public class MappingException extends RuntimeException {
+    private static final long serialVersionUID = -5428003837694156132L;
 
-/**
- * SQL query plan execution interface.
- */
-public interface ExecutionService extends LifecycleAware {
-    AsyncDataCursor<InternalSqlRow> executePlan(
-            QueryPlan plan, SqlOperationContext operationContext
-    );
+    /** Constructs the object. */
+    public MappingException(String message) {
+        super(message, null, true, false);
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
index 1e94ff3b89..3eb2faca33 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
@@ -17,42 +17,51 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
+import java.util.function.Predicate;
 import org.apache.ignite.internal.util.ArrayUtils;
+import org.jetbrains.annotations.Nullable;
 
 /** Additional information used for mapping. */
 public class MappingParameters {
-
     /** Empty mapping parameters. */
-    public static final MappingParameters EMPTY = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false);
+    public static final MappingParameters EMPTY = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false, null);
 
     /** Allow map on backups. */
-    public static final MappingParameters MAP_ON_BACKUPS = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, true);
+    public static final MappingParameters MAP_ON_BACKUPS = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, true, null);
 
     private final boolean mapOnBackups;
     private final Object[] dynamicParameters;
+    private final @Nullable Predicate<String> nodeExclusionFilter;
 
     /**
      * Creates mapping parameters.
      *
      * @param dynamicParameters Dynamic parameters.
      * @param mapOnBackups Whether to use non-primary replicas for mapping or 
not.
+     * @param nodeExclusionFilter If provided, all nodes which meet the 
predicate must be excluded from mapping.
      *
      * @return Mapping parameters.
      */
-    public static MappingParameters create(Object[] dynamicParameters, boolean 
mapOnBackups) {
-        if (dynamicParameters.length == 0) {
+    public static MappingParameters create(
+            Object[] dynamicParameters,
+            boolean mapOnBackups,
+            @Nullable Predicate<String> nodeExclusionFilter
+    ) {
+        if (dynamicParameters.length == 0 && nodeExclusionFilter == null) {
             return mapOnBackups ? MAP_ON_BACKUPS : EMPTY;
         } else {
-            return new MappingParameters(dynamicParameters, mapOnBackups);
+            return new MappingParameters(dynamicParameters, mapOnBackups, 
nodeExclusionFilter);
         }
     }
 
     private MappingParameters(
             Object[] dynamicParameters,
-            boolean mapOnBackups
+            boolean mapOnBackups,
+            @Nullable Predicate<String> nodeExclusionFilter
     ) {
         this.dynamicParameters = dynamicParameters;
         this.mapOnBackups = mapOnBackups;
+        this.nodeExclusionFilter = nodeExclusionFilter;
     }
 
     /** Values of dynamic parameters. */
@@ -63,4 +72,9 @@ public class MappingParameters {
     boolean mapOnBackups() {
         return mapOnBackups;
     }
+
+    /** If returned, all nodes which meet the predicate must be excluded from 
mapping. */
+    @Nullable Predicate<String> nodeExclusionFilter() {
+        return nodeExclusionFilter;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 34c5cd4f2c..e6bc8fd307 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -62,6 +63,7 @@ import 
org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.sql.SqlException;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An implementation of {@link MappingService}.
@@ -125,36 +127,44 @@ public class MappingServiceImpl implements MappingService 
{
         FragmentsTemplate template = getOrCreateTemplate(multiStepPlan);
 
         boolean mapOnBackups = parameters.mapOnBackups();
+        Predicate<String> nodeExclusionFilter = 
parameters.nodeExclusionFilter();
 
-        MappingsCacheValue cacheValue = mappingsCache.compute(
-                new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
-                (key, val) -> {
-                    if (val == null) {
-                        IntSet tableIds = new IntOpenHashSet();
-                        boolean topologyAware = false;
-
-                        for (Fragment fragment : template.fragments) {
-                            topologyAware = topologyAware || 
!fragment.systemViews().isEmpty();
-                            for (IgniteDataSource source : 
fragment.tables().values()) {
-                                tableIds.add(source.id());
+        CompletableFuture<MappedFragments> mappedFragments;
+        if (nodeExclusionFilter != null) {
+            mappedFragments = mapFragments(template, mapOnBackups, 
nodeExclusionFilter);
+        } else {
+            mappedFragments = mappingsCache.compute(
+                    new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
+                    (key, val) -> {
+                        if (val == null) {
+                            IntSet tableIds = new IntOpenHashSet();
+                            boolean topologyAware = false;
+
+                            for (Fragment fragment : template.fragments) {
+                                topologyAware = topologyAware || 
!fragment.systemViews().isEmpty();
+                                for (IgniteDataSource source : 
fragment.tables().values()) {
+                                    tableIds.add(source.id());
+                                }
                             }
-                        }
 
-                        long topVer = topologyAware ? 
logicalTopologyVerSupplier.get() : Long.MAX_VALUE;
+                            long topVer = topologyAware ? 
logicalTopologyVerSupplier.get() : Long.MAX_VALUE;
 
-                        return new MappingsCacheValue(topVer, tableIds, 
mapFragments(template, mapOnBackups));
-                    }
+                            assert nodeExclusionFilter == null;
 
-                    long topologyVer = logicalTopologyVerSupplier.get();
+                            return new MappingsCacheValue(topVer, tableIds, 
mapFragments(template, mapOnBackups, null));
+                        }
 
-                    if (val.topologyVersion < topologyVer) {
-                        return new MappingsCacheValue(topologyVer, 
val.tableIds, mapFragments(template, mapOnBackups));
-                    }
+                        long topologyVer = logicalTopologyVerSupplier.get();
+
+                        if (val.topologyVersion < topologyVer) {
+                            return new MappingsCacheValue(topologyVer, 
val.tableIds, mapFragments(template, mapOnBackups, null));
+                        }
 
-                    return val;
-                });
+                        return val;
+                    }).mappedFragments;
+        }
 
-        return cacheValue.mappedFragments.thenApply(frags -> 
applyPartitionPruning(frags.fragments, parameters));
+        return mappedFragments.thenApply(frags -> 
applyPartitionPruning(frags.fragments, parameters));
     }
 
     CompletableFuture<DistributionHolder> composeDistributions(
@@ -207,7 +217,8 @@ public class MappingServiceImpl implements MappingService {
 
     private CompletableFuture<MappedFragments> mapFragments(
             FragmentsTemplate template,
-            boolean mapOnBackups
+            boolean mapOnBackups,
+            @Nullable Predicate<String> nodeExclusionFilter
     ) {
         Set<IgniteSystemView> views = 
template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream())
                 .collect(Collectors.toSet());
@@ -220,7 +231,9 @@ public class MappingServiceImpl implements MappingService {
         return res.thenApply(assignments -> {
             Int2ObjectMap<ExecutionTarget> targetsById = new 
Int2ObjectOpenHashMap<>();
 
-            MappingContext context = new MappingContext(localNodeName, new 
ArrayList<>(assignments.nodes()), template.cluster);
+            MappingContext context = new MappingContext(
+                    localNodeName, assignments.nodes(nodeExclusionFilter), 
template.cluster
+            );
 
             ExecutionTargetFactory targetFactory = context.targetFactory();
 
@@ -429,8 +442,14 @@ public class MappingServiceImpl implements MappingService {
             this.nodesPerView = nodesPerView;
         }
 
-        Set<String> nodes() {
-            return nodes;
+        List<String> nodes(@Nullable Predicate<String> nodeExclusionFilter) {
+            if (nodeExclusionFilter == null) {
+                return List.copyOf(nodes);
+            }
+
+            return nodes.stream()
+                    .filter(nodeExclusionFilter.negate())
+                    .collect(Collectors.toList());
         }
 
         List<TokenizedAssignments> tableAssignments(int tableId) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
index d5d5be9877..99f353cee1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -22,11 +22,13 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.util.BitSet;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingException;
 
 /**
  * A factory that able to create targets for cluster with up to 64 nodes.
@@ -55,7 +57,9 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
 
         for (String name : nodes) {
             int id = nodeNameToId.getOrDefault(name, -1);
-            assert id >= 0 : "invalid node";
+            if (id == -1) {
+                throw new MappingException("Mandatory node was excluded from 
mapping: " + name);
+            }
 
             nodesSet.set(id);
         }
@@ -85,12 +89,18 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
             for (Assignment a : assignment.nodes()) {
                 int node = nodeNameToId.getOrDefault(a.consistentId(), -1);
 
-                assert node >= 0 : "invalid node";
-
-                nodes.set(node);
+                if (node != -1) {
+                    nodes.set(node);
+                }
             }
 
-            assert !nodes.isEmpty() : "No partition node found";
+            if (nodes.isEmpty()) {
+                List<String> nodes0 = assignment.nodes().stream()
+                        .map(Assignment::consistentId)
+                        .collect(Collectors.toList());
+
+                throw new MappingException("Mandatory nodes was excluded from 
mapping: " + nodes0);
+            }
 
             finalised = finalised && nodes.cardinality() < 2;
 
@@ -121,14 +131,20 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
     }
 
     private BitSet nodeListToMap(List<String> nodes) {
+        assert !nodes.isEmpty() : "Empty target is not allowed";
+
         BitSet nodesSet = new BitSet(nodeNameToId.size());
 
         for (String name : nodes) {
             int id = nodeNameToId.getOrDefault(name, -1);
 
-            assert id >= 0 : "invalid node";
+            if (id >= 0) {
+                nodesSet.set(id);
+            }
+        }
 
-            nodesSet.set(id);
+        if (nodesSet.isEmpty()) {
+            throw new MappingException("Mandatory nodes was excluded from 
mapping: " + nodes);
         }
 
         return nodesSet;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index fde0fe0fac..c4accfc78c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingException;
 
 /**
  * A factory that able to create targets for cluster with up to 64 nodes.
@@ -60,7 +61,11 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
 
         for (String name : nodes) {
             long node = nodeNameToId.getOrDefault(name, -1);
-            assert node >= 0 : "invalid node";
+
+            if (node == -1) {
+                throw new MappingException("Mandatory node was excluded from 
mapping: " + name);
+            }
+
             nodesMap |= node;
         }
 
@@ -89,12 +94,18 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
             for (Assignment a : assignment.nodes()) {
                 long node = nodeNameToId.getOrDefault(a.consistentId(), -1);
 
-                assert node >= 0 : "invalid node";
-
-                currentPartitionNodes |= node;
+                if (node != -1) {
+                    currentPartitionNodes |= node;
+                }
             }
 
-            assert currentPartitionNodes != 0L : "No partition node found";
+            if (currentPartitionNodes == 0) {
+                List<String> nodes = assignment.nodes().stream()
+                        .map(Assignment::consistentId)
+                        .collect(Collectors.toList());
+
+                throw new MappingException("Mandatory nodes was excluded from 
mapping: " + nodes);
+            }
 
             finalised = finalised && isPow2(currentPartitionNodes);
 
@@ -126,14 +137,20 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
     }
 
     private long nodeListToMap(List<String> nodes) {
+        assert !nodes.isEmpty() : "Empty target is not allowed";
+
         long nodesMap = 0;
 
         for (String name : nodes) {
             long node = nodeNameToId.getOrDefault(name, -1);
 
-            assert node >= 0 : "invalid node";
+            if (node >= 0) {
+                nodesMap |= node;
+            }
+        }
 
-            nodesMap |= node;
+        if (nodesMap == 0) {
+            throw new MappingException("Mandatory nodes was excluded from 
mapping: " + nodes);
         }
 
         return nodesMap;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
index 4fca0b0140..e35cef1744 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
@@ -31,6 +31,9 @@ public interface MessageService extends LifecycleAware {
      *
      * @param nodeName Node consistent ID.
      * @param msg Message.
+     * @return Future which will be completed successfully when message is 
sent. The future returned may be completed exceptionally with
+     *         {@link UnknownNodeException} when recipient cannot be found by 
given name, or any other error in case something went
+     *         wrong during sending.
      */
     CompletableFuture<Void> send(String nodeName, NetworkMessage msg);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index 2cdd9a0432..1ac2a92479 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.message;
 
 import static 
org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup.GROUP_TYPE;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.network.ChannelType;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -88,7 +90,16 @@ public class MessageServiceImpl implements MessageService {
 
                 return nullCompletedFuture();
             } else {
-                return messagingSrvc.send(nodeName, ChannelType.DEFAULT, msg);
+                return messagingSrvc.send(nodeName, ChannelType.DEFAULT, msg)
+                        .exceptionally(ex -> {
+                            if (ex instanceof 
UnresolvableConsistentIdException) {
+                                ex = new UnknownNodeException(nodeName);
+                            }
+
+                            sneakyThrow(ex);
+
+                            throw new AssertionError("Should not get here"); 
+                        });
             }
         } catch (Exception ex) {
             return CompletableFuture.failedFuture(ex);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/UnknownNodeException.java
similarity index 55%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/UnknownNodeException.java
index e62e4e576c..318c5c7328 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/UnknownNodeException.java
@@ -15,17 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec;
+package org.apache.ignite.internal.sql.engine.message;
 
-import org.apache.ignite.internal.sql.engine.InternalSqlRow;
-import org.apache.ignite.internal.sql.engine.SqlOperationContext;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+/** Thrown by {@link MessageService} when recipient cannot be found in 
physical topology. */
+public class UnknownNodeException extends RuntimeException {
+    private static final long serialVersionUID = -8242883657846080305L;
 
-/**
- * SQL query plan execution interface.
- */
-public interface ExecutionService extends LifecycleAware {
-    AsyncDataCursor<InternalSqlRow> executePlan(
-            QueryPlan plan, SqlOperationContext operationContext
-    );
+    private final String nodeName;
+
+    /** Constructs the object. */ 
+    UnknownNodeException(String nodeName) {
+        super("Unknown node: " + nodeName, null, true, false);
+
+        this.nodeName = nodeName;
+    }
+
+    /** Returns name of the node that cannot be found in physical topology. */
+    public String nodeName() {
+        return nodeName;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index c0cc139efa..72c8cb29e0 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -23,15 +23,13 @@ import static 
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -39,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -62,7 +59,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -163,6 +159,7 @@ import org.junit.jupiter.api.TestInfo;
 /**
  * Test class to verify {@link ExecutionServiceImplTest}.
  */
+@SuppressWarnings("ThrowableNotThrown")
 public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
     /** Tag allows to skip default cluster setup. */
     private static final String CUSTOM_CLUSTER_SETUP_TAG = 
"skipDefaultClusterSetup";
@@ -280,7 +277,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         assertTrue(waitForCondition(
                 () -> executionServices.stream().map(es -> 
es.localFragments(ctx.queryId()).size())
@@ -320,7 +317,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         assertTrue(waitForCondition(
                 () -> executionServices.stream().map(es -> 
es.localFragments(ctx.queryId()).size())
@@ -377,7 +374,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             return nullCompletedFuture();
         });
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         CompletionStage<?> batchFut = cursor.requestNextAsync(1);
 
@@ -418,25 +415,15 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
-
-        var batchFut = cursor.requestNextAsync(1);
-
-        await(batchFut.exceptionally(ex -> {
-            assertInstanceOf(CompletionException.class, ex);
-            assertInstanceOf(SqlException.class, ex.getCause());
-            assertInstanceOf(IgniteException.class, ex.getCause().getCause());
-            assertInstanceOf(mappingException.getClass(), 
ex.getCause().getCause().getCause());
-            assertEquals(mappingException.getMessage(), 
ex.getCause().getCause().getCause().getMessage());
-
-            return null;
-        }));
-
-        assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
+        IgniteTestUtils.assertThrows(
+                IllegalStateException.class,
+                () -> await(execService.executePlan(plan, ctx)),
+                mappingException.getMessage()
+        );
     }
 
     @Test
-    void cursorCloseCompletesSuccessfullyIfRootWasInitializedWithError() 
throws InterruptedException {
+    void testErrorOnCursorInitialization() throws InterruptedException {
         ExecutionService execService = executionServices.get(0);
         SqlOperationContext ctx = createContext();
         QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
@@ -460,9 +447,9 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             return nullCompletedFuture();
         });
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        RuntimeException actualException = 
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
 
-        await(cursor.closeAsync());
+        assertEquals(expectedEx, actualException);
 
         // try gather all possible nodes.
         List<AbstractNode<?>> execNodes = executionServices.stream()
@@ -486,7 +473,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         assertTrue(waitForCondition(
                 () -> executionServices.stream().map(es -> 
es.localFragments(ctx.queryId()).size())
@@ -530,9 +517,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         QueryPlan plan = prepare("SELECT 1", ctx);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
-
-        assertThrows(ExceptionInInitializerError.class, () -> 
await(cursor.requestNextAsync(1), 2, TimeUnit.SECONDS));
+        assertWillThrow(execService.executePlan(plan, ctx), 
ExceptionInInitializerError.class);
     }
 
     /**
@@ -544,7 +529,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         SqlOperationContext ctx = createContext();
         QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         BatchedResult<?> res = await(cursor.requestNextAsync(8));
         assertNotNull(res);
@@ -570,7 +555,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         SqlOperationContext ctx = createContext();
         QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         BatchedResult<?> res = await(cursor.requestNextAsync(9));
         assertNotNull(res);
@@ -587,12 +572,10 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
      */
     @Test
     public void testCursorIsClosedAfterAllDataReadWithNodeFailure() throws 
InterruptedException {
-        ExecutionServiceImpl execService = executionServices.get(0);
+        ExecutionServiceImpl<InternalSqlRow> execService = 
(ExecutionServiceImpl<InternalSqlRow>) executionServices.get(0);
         SqlOperationContext ctx = createContext();
         QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
-
         // node failed trigger
         CountDownLatch nodeFailedLatch = new CountDownLatch(1);
         // start response trigger
@@ -623,6 +606,8 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             }
         }));
 
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
+
         CompletableFuture<BatchedResult<InternalSqlRow>> resFut = 
cursor.requestNextAsync(9);
 
         startResponse.await();
@@ -680,14 +665,11 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             }
         }));
 
-        AsyncDataCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
-
-        // Wait till the query fails due to nodes' unavailability.
-        ExecutionException eex = assertThrows(ExecutionException.class, () -> 
cursor.requestNextAsync(1).get(10, TimeUnit.SECONDS));
-        assertThat(eex.getCause(), instanceOf(SqlException.class));
-        assertThat(eex.getCause().getCause(), 
instanceOf(NodeLeftException.class));
-        assertThat(eex.getCause().getCause().getMessage(), 
containsString("cause=Node left the cluster"));
-        assertThat(((NodeLeftException) eex.getCause().getCause()).code(), 
equalTo(NODE_LEFT_ERR));
+        IgniteTestUtils.assertThrowsWithCause(
+                () -> await(execService.executePlan(plan, ctx)),
+                NodeLeftException.class,
+                "Node left the cluster"
+        );
 
         CompletableFuture<Void> stopFuture = CompletableFuture.runAsync(() -> {
             try {
@@ -709,7 +691,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
     public void testPrefetchCallbackInvocation() throws Exception {
         String query = "SELECT * FROM test_tbl";
         int totalStatements = 20;
-        Collection<AsyncCursor<InternalSqlRow>> resultCursors = new 
ArrayBlockingQueue<>(totalStatements);
+        Collection<CompletableFuture<AsyncDataCursor<InternalSqlRow>>> 
resultCursors = new ArrayBlockingQueue<>(totalStatements);
         List<String> queries = IntStream.range(0, 
totalStatements).boxed().map(n -> query).collect(Collectors.toList());
         ArrayBlockingQueue<String> queriesQueue = new 
ArrayBlockingQueue<>(totalStatements, false, queries);
         AtomicReference<AssertionError> errHolder = new AtomicReference<>();
@@ -757,7 +739,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         assertEquals(queries.size(), resultCursors.size());
 
         CompletableFuture<?>[] closeFutures = resultCursors.stream()
-                .map(AsyncCursor::closeAsync)
+                .map(cursorFuture -> 
cursorFuture.thenCompose(AsyncCursor::closeAsync))
                 .toArray(CompletableFuture[]::new);
 
         assertThat(allOf(closeFutures), willCompleteSuccessfully());
@@ -789,7 +771,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         });
 
         QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         assertThat(prefetchCallback.prefetchFuture(), 
willThrow(equalTo(expectedException)));
 
@@ -812,7 +794,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         // Should immediately trigger query cancel exception.
         IgniteTestUtils.assertThrows(QueryCancelledException.class,
-                () -> execService.executePlan(plan, ctx),
+                () -> await(execService.executePlan(plan, ctx)),
                 "The query was cancelled while executing"
         );
     }
@@ -852,7 +834,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             }
         }));
 
-        AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan, 
ctx);
+        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
 
         startResponseLatch.await(TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
 
@@ -945,7 +927,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         // Should immediately trigger query cancel exception.
         IgniteTestUtils.assertThrows(QueryCancelledException.class,
-                () -> execService.executePlan(plan, ctx),
+                () -> await(execService.executePlan(plan, ctx)),
                 "Query timeout"
         );
     }
@@ -1068,18 +1050,17 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 .build();
 
         CompletableFuture<AsyncDataCursor<InternalSqlRow>> execPlanFut =
-                runAsync(() -> executionServices.get(0).executePlan(plan, 
ctx));
+                runAsync(() -> 
await(executionServices.get(0).executePlan(plan, ctx)));
 
         // Wait until timeout is fired and unblock mapping service.
         assertThat(timeoutFut, willCompleteSuccessfully());
 
         mappingsCacheAccessBlock.countDown();
 
-        AsyncDataCursor<InternalSqlRow> cursor = await(execPlanFut);
         //noinspection ThrowableNotThrown
         IgniteTestUtils.assertThrowsWithCause(
-                () -> await(cursor.requestNextAsync(9)),
-                SqlException.class,
+                () -> await(execPlanFut),
+                QueryCancelledException.class,
                 "Query timeout"
         );
 
@@ -1256,7 +1237,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
             AsyncCursor<InternalSqlRow> cursor;
             try {
-                cursor = execService.executePlan(plan, execCtx);
+                cursor = await(execService.executePlan(plan, execCtx));
             } catch (QueryCancelledException e) {
                 // This might happen when initialization took longer than a 
time out,
                 // Retry to get a proper error.
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
new file mode 100644
index 0000000000..7e14487bc4
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransactionTracker;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
+import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
+import org.apache.ignite.internal.sql.engine.util.QueryChecker;
+import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
+import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.CancellationToken;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests for test execution runtime used in benchmarking.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+@ExtendWith(QueryCheckerExtension.class)
+public class QueryRecoveryTest extends BaseIgniteAbstractTest {
+    private static final List<String> DATA_NODES = List.of("DATA_1", "DATA_2");
+    private static final String GATEWAY_NODE_NAME = "gateway";
+
+    @InjectQueryCheckerFactory
+    private static QueryCheckerFactory queryCheckerFactory;
+
+    private TestCluster cluster;
+
+    @BeforeEach
+    void startCluster() {
+        cluster = TestBuilders.cluster()
+                .nodes(GATEWAY_NODE_NAME, DATA_NODES.toArray(new String[0]))
+                .build();
+
+        cluster.start();
+
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+        gatewayNode.initSchema("CREATE TABLE t1 (id INT PRIMARY KEY, part_id 
INT, node VARCHAR(128))");
+
+        cluster.setAssignmentsProvider("T1", (partitionCount, b) ->
+                IntStream.range(0, partitionCount)
+                        .mapToObj(i -> DATA_NODES)
+                        .collect(Collectors.toList())
+        );
+
+        cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName, 
partId) ->
+                Collections.singleton(new Object[]{partId, partId, nodeName}))
+        );
+    }
+
+    @AfterEach
+    void stopCluster() throws Exception {
+        cluster.stop();
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    void 
queryWithImplicitTxRecoversWhenNodeLeftClusterBeforeFragmentHasBeenSent(TxType 
txType) throws Exception {
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+        QueryTransactionContext txContext = createTxContext(txType, true);
+
+        // mapping is supposed to be stable, thus if it returns 0th node from 
DATA_NODES on some environment,
+        // it should return the same node on all environments
+        String firstExpectedNode = DATA_NODES.get(0);
+        assertQuery(gatewayNode, "SELECT node FROM t1 WHERE part_id = 0", 
txContext)
+                .returns(firstExpectedNode)
+                .check();
+
+        cluster.node(firstExpectedNode).stop();
+
+        // first expected node is not available, query must be remapped to 
next available node
+        assertQuery(gatewayNode, "SELECT node FROM t1 WHERE part_id = 0", 
txContext)
+                .returns(DATA_NODES.get(1))
+                .check();
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    void 
queryWithExplicitTxCannotRecoverWhenNodeLeftClusterBeforeFragmentHasBeenSent(TxType
 txType) throws Exception {
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+        QueryTransactionContext txContext = createTxContext(txType, false);
+
+        // mapping is supposed to be stable, thus if it returns 0th node from 
DATA_NODES on some environment,
+        // it should return the same node on all environments
+        String firstExpectedNode = DATA_NODES.get(0);
+        assertQuery(gatewayNode, "SELECT node FROM t1 WHERE part_id = 0", 
txContext)
+                .returns(firstExpectedNode)
+                .check();
+
+        cluster.node(firstExpectedNode).stop();
+
+        assertThrows(
+                UnknownNodeException.class,
+                () -> assertQuery(gatewayNode, "SELECT node FROM t1 WHERE 
part_id = 0", txContext).check(),
+                "Unknown node: " + firstExpectedNode
+        );
+    }
+
+    private static QueryTransactionContext createTxContext(TxType type, 
boolean implicit) {
+        InternalTransaction tx = type.create();
+        QueryTransactionWrapper wrapper = new QueryTransactionWrapperImpl(tx, 
implicit, NoOpTransactionTracker.INSTANCE);
+        return new PredefinedTxContext(wrapper);
+    }
+
+    private static QueryChecker assertQuery(TestNode node, String query, 
QueryTransactionContext txContext) {
+        return queryCheckerFactory.create(
+                node.name(),
+                new QueryProcessor() {
+                    @Override
+                    public CompletableFuture<QueryMetadata> 
prepareSingleAsync(SqlProperties properties,
+                            @Nullable InternalTransaction transaction, String 
qry, Object... params) {
+                        throw new AssertionError("Should not be called");
+                    }
+
+                    @Override
+                    public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> 
queryAsync(
+                            SqlProperties properties,
+                            HybridTimestampTracker observableTime,
+                            @Nullable InternalTransaction transaction,
+                            @Nullable CancellationToken token,
+                            String qry,
+                            Object... params
+                    ) {
+                        return completedFuture(node.executeQuery(txContext, 
query));
+                    }
+
+                    @Override
+                    public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
+                        return nullCompletedFuture();
+                    }
+
+                    @Override
+                    public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+                        return nullCompletedFuture();
+                    }
+                },
+                null,
+                null,
+                query
+        );
+    }
+
+    static class PredefinedTxContext implements QueryTransactionContext {
+        private final QueryTransactionWrapper txWrapper;
+
+        PredefinedTxContext(QueryTransactionWrapper txWrapper) {
+            this.txWrapper = txWrapper;
+        }
+
+        @Override
+        public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
+            return txWrapper;
+        }
+
+        @Override
+        public void updateObservableTime(HybridTimestamp time) {
+            // NO-OP
+        }
+
+        @Override
+        public @Nullable QueryTransactionWrapper explicitTx() {
+            return txWrapper.implicit() ? null : txWrapper;
+        }
+    }
+
+    enum TxType {
+        RW {
+            @Override
+            InternalTransaction create() {
+                return NoOpTransaction.readWrite("LOCALHOST", false);
+            }
+        },
+
+        RO {
+            @Override
+            InternalTransaction create() {
+                return NoOpTransaction.readOnly("LOCALHOST", false);
+            }
+        };
+
+        abstract InternalTransaction create();
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index 5fc233d0be..1a4c2503ec 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -92,18 +92,26 @@ public class ExecutionTargetFactorySelfTest {
     @MethodSource("clusterFactory")
     void invalidTargets(ExecutionTargetFactory f) {
         List<String> invalidNodeSet = List.of("node100");
-        List<String> partiallyInvalidNodeSet = 
CollectionUtils.concat(SINGLE_NODE_SET, invalidNodeSet);
-
-        assertThrows(AssertionError.class, () -> f.allOf(invalidNodeSet), 
"invalid node");
-        assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet), 
"invalid node");
-        assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet), 
"invalid node");
-        assertThrows(AssertionError.class, () -> 
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "invalid node");
-
-        assertThrows(AssertionError.class, () -> 
f.allOf(partiallyInvalidNodeSet), "invalid node");
-        assertThrows(AssertionError.class, () -> 
f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), "invalid node");
-        assertThrows(AssertionError.class, () -> 
f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), "invalid node");
-        assertThrows(AssertionError.class, () -> f.resolveNodes(
-                f.partitioned(assignment(partiallyInvalidNodeSet, 
partiallyInvalidNodeSet))), "invalid node");
+
+        assertThrows(MappingException.class, () -> f.allOf(invalidNodeSet), 
"Mandatory node was excluded from mapping: node100");
+        assertThrows(MappingException.class, () -> f.someOf(invalidNodeSet), 
"Mandatory nodes was excluded from mapping: [node100]");
+        assertThrows(MappingException.class, () -> f.oneOf(invalidNodeSet), 
"Mandatory nodes was excluded from mapping: [node100]");
+        assertThrows(MappingException.class, () -> f.partitioned(
+                assignmentFromPrimaries(invalidNodeSet)), "Mandatory nodes was 
excluded from mapping: [node100]");
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void partiallyInvalidTargets(ExecutionTargetFactory f) {
+        List<String> partiallyInvalidNodeSet = 
CollectionUtils.concat(SINGLE_NODE_SET, List.of("node100"));
+
+        // AllOf requires all provided node to be used
+        assertThrows(MappingException.class, () -> 
f.allOf(partiallyInvalidNodeSet), "Mandatory node was excluded from mapping: 
node100");
+
+        // rest of the targets can be executed on subset of the nodes
+        assertThat(f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), 
equalTo(SINGLE_NODE_SET));
+        assertThat(f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), 
equalTo(SINGLE_NODE_SET));
+        
assertThat(f.resolveNodes(f.partitioned(assignment(partiallyInvalidNodeSet, 
partiallyInvalidNodeSet))), equalTo(SINGLE_NODE_SET));
     }
 
     @ParameterizedTest
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index fef3e7fa88..b5c2e88a17 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -20,6 +20,9 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -106,7 +109,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
 
         try {
             PLAN = (MultiStepPlan) cluster.node("N1").prepare("SELECT * FROM 
t1");
-            PLAN_WITH_SYSTEM_VIEW = (MultiStepPlan) 
cluster.node("N1").prepare("SELECT * FROM system.test_view, t1");
+            PLAN_WITH_SYSTEM_VIEW = (MultiStepPlan) 
cluster.node("N1").prepare("SELECT * FROM system.test_view");
         } finally {
             try {
                 cluster.stop();
@@ -156,6 +159,36 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         assertThat(mappingFuture, willSucceedFast());
     }
 
+    @Test
+    void mappingWithNodeFilter() {
+        String localNodeName = "NODE";
+        List<String> nodeNames = List.of("NODE1", "NODE2");
+
+        MappingService service = createMappingService(localNodeName, 
nodeNames, 100);
+
+        List<MappedFragment> defaultMapping = 
await(service.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+
+        assertThat(defaultMapping, hasSize(2));
+
+        MappedFragment leafFragment = defaultMapping.stream()
+                .filter(fragment -> !fragment.fragment().rootFragment())
+                .findFirst()
+                .orElseThrow();
+
+        assertThat(leafFragment.nodes(), hasSize(1));
+
+        String nodeToExclude = leafFragment.nodes().get(0);
+
+        MappingParameters params = MappingParameters.create(new Object[0], 
false, nodeToExclude::equals);
+        List<MappedFragment> mappingWithExclusion = 
await(service.map(PLAN_WITH_SYSTEM_VIEW, params));
+
+        assertNotSame(defaultMapping, mappingWithExclusion);
+
+        for (MappedFragment fragment : mappingWithExclusion) {
+            assertThat(nodeToExclude, not(in(fragment.nodes())));
+        }
+    }
+
     @Test
     public void testCacheInvalidationOnTopologyChange() {
         String localNodeName = "NODE";
@@ -177,7 +210,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN, 
PARAMS));
         List<MappedFragment> sysViewMapping = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
 
-        verify(execProvider, times(2)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
+        verify(execProvider, times(1)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
         verify(execProvider, times(1)).forSystemView(any());
 
         verify(mappingService, times(2)).composeDistributions(anySet(), 
anySet(), anyBoolean());
@@ -195,7 +228,7 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         // Plan with tables only must not be invalidated on topology change.
         assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
 
-        verify(execProvider, times(3)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
+        verify(execProvider, times(1)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
         verify(execProvider, times(2)).forSystemView(any());
     }
 
@@ -233,21 +266,19 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
                 execProvider
         ));
 
-        List<MappedFragment> mappedFragments = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+        List<MappedFragment> mappedFragments = await(mappingService.map(PLAN, 
PARAMS));
         verify(execProvider, times(1)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
-        verify(execProvider, times(1)).forSystemView(any());
 
         // Simulate expiration of the primary replica for non-mapped table - 
the cache entry should not be invalidated.
         
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T2")));
-        assertSame(mappedFragments, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
+        assertSame(mappedFragments, await(mappingService.map(PLAN, PARAMS)));
 
         verify(mappingService, times(1)).composeDistributions(anySet(), 
anySet(), anyBoolean());
 
         // Simulate expiration of the primary replica for mapped table - the 
cache entry should be invalidated.
         
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T1")));
-        assertNotSame(mappedFragments, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
+        assertNotSame(mappedFragments, await(mappingService.map(PLAN, 
PARAMS)));
         verify(execProvider, times(2)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
-        verify(execProvider, times(2)).forSystemView(any());
     }
 
     private MappingServiceImpl createMappingServiceNoCache(String 
localNodeName, List<String> nodeNames) {


Reply via email to