This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e47e6b31f IGNITE-23663 Sql. Make TestNode to use QueryExecutor (#4711)
5e47e6b31f is described below

commit 5e47e6b31f9fc3ad32b6eb20c637822700b01fd4
Author: korlov42 <[email protected]>
AuthorDate: Wed Nov 13 13:09:06 2024 +0200

    IGNITE-23663 Sql. Make TestNode to use QueryExecutor (#4711)
---
 .../ignite/internal/sql/engine/exec/fsm/Query.java |  12 +-
 .../sql/engine/tx/ScriptTransactionContext.java    |   6 +-
 .../sql/engine/benchmarks/SqlBenchmark.java        |  31 +-
 .../sql/engine/exec/TransactionEnlistTest.java     |  39 +--
 .../exec/coercion/BaseTypeCheckExecutionTest.java  |  20 +-
 .../engine/framework/ClusterServiceFactory.java    |  31 +-
 .../sql/engine/framework/NoOpTransaction.java      |   3 +-
 .../engine/framework/NoOpTransactionTracker.java   |   4 +-
 .../sql/engine/framework/TestBuilders.java         | 317 +++++++++++++--------
 .../internal/sql/engine/framework/TestCluster.java |  17 ++
 .../sql/engine/framework/TestClusterTest.java      | 103 +++----
 .../internal/sql/engine/framework/TestNode.java    | 146 ++++++----
 .../internal/sql/engine/util/QueryCheckerTest.java |  42 +--
 13 files changed, 468 insertions(+), 303 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
index b7a6414a92..3fc324bb64 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
@@ -173,9 +173,19 @@ class Query implements Runnable {
     /** Moves the query to a given state. */
     void moveTo(ExecutionPhase newPhase) {
         synchronized (mux) {
+            ExecutionPhase currentPhase = this.currentPhase;
+
+            // Transition from TERMINATED to TERMINATED is possible, when 
query is completed successfully 
+            // just moment before the node starts to shut down. In this case, 
all running queries are terminated
+            // with NodeStoppingException, and completed query may still be 
presented in the list of running queries.
+            if (currentPhase == newPhase && currentPhase == 
ExecutionPhase.TERMINATED) {
+                // ignore such transition
+                return;
+            }
+
             assert currentPhase.transitionAllowed(newPhase) : "currentPhase=" 
+ currentPhase + ", newPhase=" + newPhase;
 
-            currentPhase = newPhase;
+            this.currentPhase = newPhase;
         }
 
         onPhaseStartedCallback.computeIfAbsent(newPhase, k -> new 
CompletableFuture<>()).complete(null);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
index 6e158e4ea5..e38f5f7a31 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.Nullable;
  * Starts an implicit or script-driven transaction if there is no external 
transaction.
  */
 public class ScriptTransactionContext implements QueryTransactionContext {
-    private final QueryTransactionContextImpl txContext;
+    private final QueryTransactionContext txContext;
 
     private final TransactionTracker txTracker;
 
@@ -47,9 +47,7 @@ public class ScriptTransactionContext implements 
QueryTransactionContext {
 
     /** Constructor. */
     public ScriptTransactionContext(QueryTransactionContext txContext, 
TransactionTracker txTracker) {
-        assert txContext instanceof QueryTransactionContextImpl : txContext;
-
-        this.txContext = (QueryTransactionContextImpl) txContext;
+        this.txContext = txContext;
         this.txTracker = txTracker;
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
index 3e841fae1b..e54eddbe68 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -19,14 +19,15 @@ package org.apache.ignite.internal.sql.engine.benchmarks;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.TestCluster;
 import org.apache.ignite.internal.sql.engine.framework.TestNode;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.type.NativeTypes;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -62,27 +63,29 @@ public class SqlBenchmark {
     // @formatter:off
     private final TestCluster cluster = TestBuilders.cluster()
             .nodes("N1", "N2", "N3")
-            .addTable()
-                    .name("T1")
-                    .addKeyColumn("ID", NativeTypes.INT32)
-                    .addColumn("VAL", NativeTypes.stringOf(64))
-                    .end()
-            .dataProvider("N1", "T1", TestBuilders.tableScan(dataProvider))
-            .dataProvider("N2", "T1", TestBuilders.tableScan(dataProvider))
-            .dataProvider("N3", "T1", TestBuilders.tableScan(dataProvider))
             .build();
     // @formatter:on
 
     private final TestNode gatewayNode = cluster.node("N1");
 
-    private QueryPlan plan;
-
     /** Starts the cluster and prepares the plan of the query. */
     @Setup
     public void setUp() {
         cluster.start();
 
-        plan = gatewayNode.prepare("SELECT * FROM t1");
+        //noinspection ConcatenationWithEmptyString
+        cluster.node("N1").initSchema(""
+                + "CREATE ZONE test_zone WITH partitions=3, 
storage_profiles='Default';"
+                + "CREATE TABLE t1 (id INT PRIMARY KEY, val VARCHAR(64)) ZONE 
test_zone");
+
+        cluster.setAssignmentsProvider("T1", (partitionCount, b) -> {
+            assert partitionCount == 3;
+
+            return Stream.of("N1", "N2", "N3")
+                    .map(List::of)
+                    .collect(Collectors.toList());
+        });
+        cluster.setDataProvider("T1", TestBuilders.tableScan(dataProvider));
     }
 
     /** Stops the cluster. */
@@ -94,7 +97,7 @@ public class SqlBenchmark {
     /** Very simple test to measure performance of minimal possible 
distributed query. */
     @Benchmark
     public void selectAllSimple(Blackhole bh) {
-        for (var row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+        for (var row : await(gatewayNode.executeQuery("SELECT * FROM 
t1").requestNextAsync(10_000)).items()) {
             bh.consume(row);
         }
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
index ca71111124..316ed18a11 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
@@ -23,12 +23,12 @@ import static org.mockito.Mockito.times;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -44,7 +44,6 @@ import 
org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.lang.CancellationToken;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
@@ -62,18 +61,22 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
     private static QueryCheckerFactory queryCheckerFactory;
 
     private static final TestCluster CLUSTER = TestBuilders.cluster()
-            .nodes(NODE_NAME1, true)
-            .addTable()
-            .name("T1")
-            .addKeyColumn("ID", NativeTypes.INT32)
-            .addColumn("VAL", NativeTypes.INT32)
-            .end()
-            .dataProvider(NODE_NAME1, "T1", 
TestBuilders.tableScan(DataProvider.fromCollection(List.of())))
+            .nodes(NODE_NAME1)
             .build(); // add method use table partitions
 
     @BeforeAll
      static void startCluster() {
         CLUSTER.start();
+
+        //noinspection ConcatenationWithEmptyString
+        CLUSTER.node("N1").initSchema(""
+                + "CREATE ZONE test_zone WITH partitions=3, 
storage_profiles='Default';"
+                + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE 
test_zone");
+
+        CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) -> 
IntStream.range(0, partitionCount)
+                .mapToObj(i -> List.of("N1"))
+                .collect(Collectors.toList()));
+        CLUSTER.setDataProvider("T1", 
TestBuilders.tableScan(DataProvider.fromCollection(List.of())));
     }
 
     @AfterAll
@@ -92,7 +95,7 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
 
         try {
             assertQuery("INSERT INTO t1 VALUES(1, 2), (2, 3)", 
spiedTx).check();
-        } catch (Exception ex) {
+        } catch (Exception ignored) {
             // No op.
         }
 
@@ -147,19 +150,7 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
             assert params == null || params.length == 0 : "params are not 
supported";
             assert !prepareOnly : "Expected that the query will only be 
prepared, but not executed";
 
-            QueryPlan plan = node.prepare(qry);
-            AsyncDataCursor<InternalSqlRow> dataCursor = 
node.executePlan(plan, transaction);
-
-            SqlQueryType type = plan.type();
-
-            assert type != null;
-
-            AsyncSqlCursor<InternalSqlRow> sqlCursor = new 
AsyncSqlCursorImpl<>(
-                    type,
-                    plan.metadata(),
-                    dataCursor,
-                    null
-            );
+            AsyncSqlCursor<InternalSqlRow> sqlCursor = 
node.executeQuery(transaction, qry);
 
             return CompletableFuture.completedFuture(sqlCursor);
         }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
index 468758d423..0992ed5f1f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
@@ -22,13 +22,16 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.TestCluster;
 import org.apache.ignite.internal.sql.engine.framework.TestNode;
 import org.apache.ignite.internal.sql.engine.planner.datatypes.utils.TypePair;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.util.CursorUtils;
 import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -38,7 +41,6 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnType;
-import org.apache.ignite.sql.ResultSetMetadata;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -225,7 +227,11 @@ class BaseTypeCheckExecutionTest extends 
BaseIgniteAbstractTest {
                 .addColumn("C1", typePair.first())
                 .addColumn("C2", typePair.second())
                 .end()
-                .dataProvider("N1", "T", TestBuilders.tableScan(dataProvider))
+                .defaultAssignmentsProvider(tableName -> (partNum, 
includeBackups) -> IntStream.range(0, partNum)
+                        .mapToObj(part -> List.of("N1"))
+                        .collect(Collectors.toList())
+                )
+                .defaultDataProvider(tableName -> 
TestBuilders.tableScan(dataProvider))
                 .build();
 
         return new ClusterWrapper(cluster);
@@ -242,11 +248,11 @@ class BaseTypeCheckExecutionTest extends 
BaseIgniteAbstractTest {
 
         void process(String sql, Matcher<Object> resultMatcher) {
             TestNode gatewayNode = cluster.node("N1");
-            QueryPlan plan = gatewayNode.prepare(sql);
-            ResultSetMetadata resultMeta = plan.metadata();
-            ColumnMetadata colMeta = resultMeta.columns().get(0);
 
-            for (InternalSqlRow row : 
CursorUtils.getAllFromCursor(gatewayNode.executePlan(plan))) {
+            AsyncSqlCursor<InternalSqlRow> cursor = 
gatewayNode.executeQuery(sql);
+            ColumnMetadata colMeta = cursor.metadata().columns().get(0);
+
+            for (InternalSqlRow row : CursorUtils.getAllFromCursor(cursor)) {
                 assertNotNull(row);
                 assertNotNull(row.get(0), "Await not null object");
                 assertThat(new Pair<>(row.get(0), colMeta), resultMatcher);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 1bab8c4985..91ddbe6837 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -39,7 +39,9 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
@@ -120,6 +122,22 @@ public class ClusterServiceFactory {
             public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
                 return nullCompletedFuture();
             }
+
+            @Override
+            public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+                ClusterNode node = nodeByName.remove(nodeName);
+
+                if (node != null) {
+                    messagingServicesByNode.remove(nodeName);
+                    topologyServicesByNode.remove(nodeName);
+
+                    topologyServicesByNode.values().stream()
+                            .flatMap(topSrvc -> 
topSrvc.getEventHandlers().stream())
+                            .forEach(eventHandler -> 
eventHandler.onDisappeared(node));
+                }
+
+                return nullCompletedFuture();
+            }
         };
     }
 
@@ -150,6 +168,11 @@ public class ClusterServiceFactory {
             return new ClusterNodeImpl(randomUUID(), name, 
NetworkAddress.from("127.0.0.1:" + NODE_COUNTER.incrementAndGet()));
         }
 
+        @Override
+        public Collection<TopologyEventHandler> getEventHandlers() {
+            return super.getEventHandlers();
+        }
+
         /** {@inheritDoc} */
         @Override
         public ClusterNode localMember() {
@@ -205,7 +228,13 @@ public class ClusterServiceFactory {
 
         @Override
         public CompletableFuture<Void> send(String recipientConsistentId, 
ChannelType channelType, NetworkMessage msg) {
-            for (var handler : 
messagingServicesByNode.get(recipientConsistentId).messageHandlers(msg.groupType()))
 {
+            LocalMessagingService recipient = 
messagingServicesByNode.get(recipientConsistentId);
+
+            if (recipient == null) {
+                return CompletableFuture.failedFuture(new 
UnresolvableConsistentIdException(recipientConsistentId));
+            }
+
+            for (NetworkMessageHandler handler : 
recipient.messageHandlers(msg.groupType())) {
                 handler.onReceived(msg, localNode, null);
             }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 886a7a4b2f..e221084a4c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -41,7 +41,8 @@ public final class NoOpTransaction implements 
InternalTransaction {
 
     private final UUID id = randomUUID();
 
-    private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1);
+    private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
+            .addPhysicalTime(System.currentTimeMillis());
 
     private final IgniteBiTuple<ClusterNode, Long> tuple;
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
index de3b7ece40..d87d66d48b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
@@ -23,8 +23,8 @@ import 
org.apache.ignite.internal.sql.engine.exec.TransactionTracker;
 /**
  * Dummy no-op tracker.
  */
-final class NoOpTransactionTracker implements TransactionTracker {
-    static final TransactionTracker INSTANCE = new NoOpTransactionTracker();
+public final class NoOpTransactionTracker implements TransactionTracker {
+    public static final TransactionTracker INSTANCE = new 
NoOpTransactionTracker();
 
     @Override
     public boolean register(UUID txId, boolean readOnly) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 471f6a59bd..8af2224586 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -44,8 +44,10 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -176,11 +178,11 @@ public class TestBuilders {
     }
 
     /**
-     * Factory method to create {@link ScannableTable table} instance from 
given data provider with
-     * only implemented {@link ScannableTable#scan table scan}.
+     * Factory method to create {@link ScannableTable table} instance from 
given data provider with only implemented
+     * {@link ScannableTable#scan table scan}.
      */
     public static ScannableTable tableScan(DataProvider<Object[]> 
dataProvider) {
-        return new ScannableTable() {
+        return new AbstractScannableTable() {
             @Override
             public <RowT> Publisher<RowT> scan(
                     ExecutionContext<RowT> ctx,
@@ -199,36 +201,39 @@ public class TestBuilders {
                         rowFactory::create
                 );
             }
+        };
+    }
 
+    /**
+     * Factory method to create {@link ScannableTable table} instance from 
given data provider with only implemented
+     * {@link ScannableTable#scan table scan}.
+     */
+    public static ScannableTable tableScan(BiFunction<String, Integer, 
Iterable<Object[]>> generatorFunction) {
+        return new AbstractScannableTable() {
             @Override
-            public <RowT> Publisher<RowT> 
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken 
partWithConsistencyToken,
-                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, @Nullable RangeCondition<RowT> cond,
-                    @Nullable BitSet requiredColumns) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> 
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
-                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, RowT key, @Nullable BitSet requiredColumns) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public <RowT> CompletableFuture<@Nullable RowT> 
primaryKeyLookup(ExecutionContext<RowT> ctx, InternalTransaction explicitTx,
-                    RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet 
requiredColumns) {
-                throw new UnsupportedOperationException();
-            }
+            public <RowT> Publisher<RowT> scan(
+                    ExecutionContext<RowT> ctx,
+                    PartitionWithConsistencyToken partWithConsistencyToken,
+                    RowFactory<RowT> rowFactory,
+                    @Nullable BitSet requiredColumns
+            ) {
 
-            @Override
-            public CompletableFuture<Long> estimatedSize() {
-                return 
CompletableFuture.completedFuture(dataProvider.estimatedSize());
+                return new TransformingPublisher<>(
+                        SubscriptionUtils.fromIterable(
+                                () -> new TransformingIterator<>(
+                                        
generatorFunction.apply(ctx.localNode().name(), 
partWithConsistencyToken.partId()).iterator(),
+                                        row -> project(row, requiredColumns)
+                                )
+                        ),
+                        rowFactory::create
+                );
             }
         };
     }
 
     /**
-     * Factory method to create {@link ScannableTable table} instance from 
given data provider with
-     * only implemented {@link ScannableTable#indexRangeScan index range scan}.
+     * Factory method to create {@link ScannableTable table} instance from 
given data provider with only implemented
+     * {@link ScannableTable#indexRangeScan index range scan}.
      */
     public static ScannableTable indexRangeScan(DataProvider<Object[]> 
dataProvider) {
         return new ScannableTable() {
@@ -277,8 +282,8 @@ public class TestBuilders {
     }
 
     /**
-     * Factory method to create {@link ScannableTable table} instance from 
given data provider with
-     * only implemented {@link ScannableTable#indexLookup index lookup}.
+     * Factory method to create {@link ScannableTable table} instance from 
given data provider with only implemented
+     * {@link ScannableTable#indexLookup index lookup}.
      */
     public static ScannableTable indexLookup(DataProvider<Object[]> 
dataProvider) {
         return new ScannableTable() {
@@ -342,17 +347,6 @@ public class TestBuilders {
          */
         ClusterBuilder nodes(String firstNodeName, String... otherNodeNames);
 
-        /**
-         * Sets desired names for the cluster nodes.
-         *
-         * @param firstNodeName A name of the first node. There is no 
difference in what node should be first. This parameter was
-         *         introduced to force user to provide at least one node name.
-         * @param useTablePartitions If {@code true} map table partitions to 
whole defined nodes.
-         * @param otherNodeNames An array of rest of the names to create 
cluster from.
-         * @return {@code this} for chaining.
-         */
-        public ClusterBuilder nodes(String firstNodeName, boolean 
useTablePartitions, String... otherNodeNames);
-
         /**
          * Creates a table builder to add to the cluster.
          *
@@ -364,8 +358,8 @@ public class TestBuilders {
          * Adds the given system view to the cluster.
          *
          * @param systemView System view.
-         * @return {@code this} for chaining.
          * @param <T> System view data type.
+         * @return {@code this} for chaining.
          */
         <T> ClusterBuilder addSystemView(SystemView<T> systemView);
 
@@ -377,14 +371,20 @@ public class TestBuilders {
         TestCluster build();
 
         /**
-         * Provides implementation of table with given name local per given 
node.
+         * Provides implementation of table with given name.
+         *
+         * @param defaultDataProvider Name of the table given instance 
represents.
+         * @return {@code this} for chaining.
+         */
+        ClusterBuilder defaultDataProvider(DefaultDataProvider 
defaultDataProvider);
+
+        /**
+         * Provides implementation of table with given name.
          *
-         * @param nodeName Name of the node given instance of table will be 
assigned to.
-         * @param tableName Name of the table given instance represents.
-         * @param table Actual table that will be used for read operations 
during execution.
+         * @param defaultAssignmentsProvider Name of the table given instance 
represents.
          * @return {@code this} for chaining.
          */
-        ClusterBuilder dataProvider(String nodeName, String tableName, 
ScannableTable table);
+        ClusterBuilder defaultAssignmentsProvider(DefaultAssignmentsProvider 
defaultAssignmentsProvider);
 
         /**
          * Registers a previously added system view (see {@link 
#addSystemView(SystemView)}) on the specified node.
@@ -599,27 +599,16 @@ public class TestBuilders {
     private static class ClusterBuilderImpl implements ClusterBuilder {
         private final List<ClusterTableBuilderImpl> tableBuilders = new 
ArrayList<>();
         private List<String> nodeNames;
-        private boolean useTablePartitions;
-        private final Map<String, Map<String, ScannableTable>> 
nodeName2tableName2table = new HashMap<>();
         private final List<SystemView<?>> systemViews = new ArrayList<>();
         private final Map<String, Set<String>> nodeName2SystemView = new 
HashMap<>();
 
-        /** {@inheritDoc} */
-        @Override
-        public ClusterBuilder nodes(String firstNodeName, String... 
otherNodeNames) {
-            this.nodeNames = new ArrayList<>();
-
-            nodeNames.add(firstNodeName);
-            nodeNames.addAll(Arrays.asList(otherNodeNames));
-
-            return this;
-        }
+        private @Nullable DefaultDataProvider defaultDataProvider = null;
+        private @Nullable DefaultAssignmentsProvider 
defaultAssignmentsProvider = null;
 
         /** {@inheritDoc} */
         @Override
-        public ClusterBuilder nodes(String firstNodeName, boolean 
useTablePartitions, String... otherNodeNames) {
+        public ClusterBuilder nodes(String firstNodeName, String... 
otherNodeNames) {
             this.nodeNames = new ArrayList<>();
-            this.useTablePartitions = useTablePartitions;
 
             nodeNames.add(firstNodeName);
             nodeNames.addAll(Arrays.asList(otherNodeNames));
@@ -640,15 +629,22 @@ public class TestBuilders {
         }
 
         @Override
-        public ClusterBuilder dataProvider(String nodeName, String tableName, 
ScannableTable table) {
-            nodeName2tableName2table.computeIfAbsent(nodeName, key -> new 
HashMap<>()).put(tableName, table);
+        public ClusterBuilder registerSystemView(String nodeName, String 
systemViewName) {
+            nodeName2SystemView.computeIfAbsent(nodeName, key -> new 
HashSet<>()).add(systemViewName);
 
             return this;
         }
 
         @Override
-        public ClusterBuilder registerSystemView(String nodeName, String 
systemViewName) {
-            nodeName2SystemView.computeIfAbsent(nodeName, key -> new 
HashSet<>()).add(systemViewName);
+        public ClusterBuilder defaultDataProvider(DefaultDataProvider 
defaultDataProvider) {
+            this.defaultDataProvider = defaultDataProvider;
+
+            return this;
+        }
+
+        @Override
+        public ClusterBuilder 
defaultAssignmentsProvider(DefaultAssignmentsProvider 
defaultAssignmentsProvider) {
+            this.defaultAssignmentsProvider = defaultAssignmentsProvider;
 
             return this;
         }
@@ -656,8 +652,6 @@ public class TestBuilders {
         /** {@inheritDoc} */
         @Override
         public TestCluster build() {
-            validateConfiguredDataProviders();
-
             var clusterService = new ClusterServiceFactory(nodeNames);
 
             var clusterName = "test_cluster";
@@ -674,13 +668,6 @@ public class TestBuilders {
                     new DdlSqlToCommandConverter(), PLANNING_TIMEOUT, 
PLANNING_THREAD_COUNT,
                     new NoOpMetricManager(), schemaManager);
 
-            Map<String, List<List<String>>> owningNodesByTableName = new 
HashMap<>();
-            for (Entry<String, Map<String, ScannableTable>> entry : 
nodeName2tableName2table.entrySet()) {
-                for (String tableName : entry.getValue().keySet()) {
-                    owningNodesByTableName.computeIfAbsent(tableName, key -> 
new ArrayList<>()).add(List.of(entry.getKey()));
-                }
-            }
-
             Map<String, List<String>> systemViewsByNode = new HashMap<>();
 
             for (Entry<String, Set<String>> entry : 
nodeName2SystemView.entrySet()) {
@@ -717,13 +704,20 @@ public class TestBuilders {
                     })
                     .collect(Collectors.toList());
 
+            ConcurrentMap<String, ScannableTable> dataProvidersByTableName = 
new ConcurrentHashMap<>();
+            ConcurrentMap<String, AssignmentsProvider> 
assignmentsProviderByTableName = new ConcurrentHashMap<>();
+            DefaultDataProvider defaultDataProvider = this.defaultDataProvider;
             Map<String, TestNode> nodes = nodeNames.stream()
                     .map(name -> {
                         var systemViewManager = new 
SystemViewManagerImpl(name, catalogManager);
                         var executionProvider = new 
TestExecutionDistributionProvider(
                                 systemViewManager::owningNodes,
-                                owningNodesByTableName,
-                                useTablePartitions
+                                tableName -> resolveProvider(
+                                        tableName,
+                                        assignmentsProviderByTableName,
+                                        defaultAssignmentsProvider != null ? 
defaultAssignmentsProvider::get : null
+                                ),
+                                false
                         );
                         var partitionPruner = new PartitionPrunerImpl();
                         var mappingService = new MappingServiceImpl(
@@ -743,12 +737,20 @@ public class TestBuilders {
 
                         return new TestNode(
                                 name,
+                                catalogManager,
                                 clusterService.forNode(name),
                                 parserService,
                                 prepareService,
                                 schemaManager,
                                 mappingService,
-                                new 
TestExecutableTableRegistry(nodeName2tableName2table.get(name), schemaManager),
+                                new TestExecutableTableRegistry(
+                                        name0 -> resolveProvider(
+                                                name0,
+                                                dataProvidersByTableName,
+                                                defaultDataProvider != null ? 
defaultDataProvider::get : null
+                                        ),
+                                        schemaManager
+                                ),
                                 ddlHandler,
                                 systemViewManager
                         );
@@ -756,6 +758,8 @@ public class TestBuilders {
                     .collect(Collectors.toMap(TestNode::name, 
Function.identity()));
 
             return new TestCluster(
+                    dataProvidersByTableName,
+                    assignmentsProviderByTableName,
                     nodes,
                     catalogManager,
                     prepareService,
@@ -764,29 +768,6 @@ public class TestBuilders {
             );
         }
 
-        private void validateConfiguredDataProviders() {
-            Set<String> dataProvidersOwners = new 
HashSet<>(nodeName2tableName2table.keySet());
-
-            dataProvidersOwners.removeAll(Set.copyOf(nodeNames));
-
-            if (!dataProvidersOwners.isEmpty()) {
-                Map<String, List<String>> problematicTables = new HashMap<>();
-
-                for (String outsiderNode : dataProvidersOwners) {
-                    for (String problematicTable : 
nodeName2tableName2table.get(outsiderNode).keySet()) {
-                        problematicTables.computeIfAbsent(problematicTable, k 
-> new ArrayList<>()).add(outsiderNode);
-                    }
-                }
-
-                String problematicTablesString = 
problematicTables.entrySet().stream()
-                        .map(e -> e.getKey() + ": " + e.getValue())
-                        .collect(Collectors.joining(", "));
-
-                throw new AssertionError(format("The table has a dataProvider 
that is outside the cluster "
-                        + "[{}]", problematicTablesString));
-            }
-        }
-
         private void initAction(CatalogManager catalogManager) {
             List<CatalogCommand> initialSchema = tableBuilders.stream()
                     .flatMap(builder -> builder.build().stream())
@@ -992,7 +973,8 @@ public class TestBuilders {
                             NativeTypes.INT32,
                             DefaultValueStrategy.DEFAULT_COMPUTED,
                             () -> {
-                                throw new AssertionError("Partition virtual 
column is generated by a function"); }
+                                throw new AssertionError("Partition virtual 
column is generated by a function");
+                            }
                     ))), distribution);
 
             Map<String, IgniteIndex> indexes = indexBuilders.stream()
@@ -1442,10 +1424,10 @@ public class TestBuilders {
     }
 
     private static class TestExecutableTableRegistry implements 
ExecutableTableRegistry {
-        private final Map<String, ScannableTable> tablesByName;
+        private final Function<String, ScannableTable> tablesByName;
         private final SqlSchemaManager schemaManager;
 
-        TestExecutableTableRegistry(Map<String, ScannableTable> tablesByName, 
SqlSchemaManager schemaManager) {
+        TestExecutableTableRegistry(Function<String, ScannableTable> 
tablesByName, SqlSchemaManager schemaManager) {
             this.tablesByName = tablesByName;
             this.schemaManager = schemaManager;
         }
@@ -1459,7 +1441,7 @@ public class TestBuilders {
             return CompletableFuture.completedFuture(new ExecutableTable() {
                 @Override
                 public ScannableTable scannableTable() {
-                    ScannableTable scannableTable = 
tablesByName.get(table.name());
+                    ScannableTable scannableTable = 
tablesByName.apply(table.name());
 
                     assert scannableTable != null;
 
@@ -1571,8 +1553,8 @@ public class TestBuilders {
         }
 
         /**
-         * Sets a function that returns system views. Function accepts a view 
name and returns a list of nodes
-         * a system view is available at.
+         * Sets a function that returns system views. Function accepts a view 
name and returns a list of nodes a system view is available
+         * at.
          */
         public ExecutionDistributionProviderBuilder 
setSystemViews(Function<String, List<String>> systemViews) {
             this.owningNodesBySystemViewName = systemViews;
@@ -1587,9 +1569,22 @@ public class TestBuilders {
 
         /** Creates an instance of {@link ExecutionDistributionProvider}. */
         public ExecutionDistributionProvider build() {
+            Map<String, List<List<String>>> owningNodesByTableName = 
Map.copyOf(this.owningNodesByTableName);
+
+            Function<String, AssignmentsProvider> sourceProviderFunction = 
tableName ->
+                    (AssignmentsProvider) (partitionsCount, includeBackups) -> 
{
+                        List<List<String>> assignments = 
owningNodesByTableName.get(tableName);
+
+                        if (nullOrEmpty(assignments)) {
+                            throw new AssertionError("Assignments are not 
configured for table " + tableName);
+                        }
+
+                        return assignments;
+                    };
+
             return new TestExecutionDistributionProvider(
                     owningNodesBySystemViewName,
-                    Map.copyOf(owningNodesByTableName),
+                    sourceProviderFunction,
                     useTablePartitions
             );
         }
@@ -1598,17 +1593,17 @@ public class TestBuilders {
     private static class TestExecutionDistributionProvider implements 
ExecutionDistributionProvider {
         final Function<String, List<String>> owningNodesBySystemViewName;
 
-        final Map<String, List<List<String>>> owningNodesByTableName;
+        final Function<String, AssignmentsProvider> owningNodesByTableName;
 
         final boolean useTablePartitions;
 
         private TestExecutionDistributionProvider(
                 Function<String, List<String>> owningNodesBySystemViewName,
-                Map<String, List<List<String>>> owningNodesByTableName,
+                Function<String, AssignmentsProvider> owningNodesByTableName,
                 boolean useTablePartitions
         ) {
             this.owningNodesBySystemViewName = owningNodesBySystemViewName;
-            this.owningNodesByTableName = Map.copyOf(owningNodesByTableName);
+            this.owningNodesByTableName = owningNodesByTableName;
             this.useTablePartitions = useTablePartitions;
         }
 
@@ -1620,9 +1615,25 @@ public class TestBuilders {
         }
 
         @Override
-        public CompletableFuture<List<TokenizedAssignments>> 
forTable(HybridTimestamp operationTime, IgniteTable table,
-                boolean includeBackups) {
-            List<List<String>> owningNodes = 
owningNodesByTableName.get(table.name());
+        public CompletableFuture<List<TokenizedAssignments>> forTable(
+                HybridTimestamp operationTime,
+                IgniteTable table,
+                boolean includeBackups
+        ) {
+            AssignmentsProvider provider = 
owningNodesByTableName.apply(table.name());
+
+            if (provider == null) {
+                return CompletableFuture.failedFuture(
+                        new AssertionError("AssignmentsProvider is not 
configured for table " + table.name())
+                );
+            }
+            List<List<String>> owningNodes = provider.get(table.partitions(), 
includeBackups);
+
+            if (nullOrEmpty(owningNodes) || owningNodes.size() != 
table.partitions()) {
+                throw new AssertionError("Configured AssignmentsProvider 
returns less assignment than expected "
+                        + "[table=" + table.name() + ", 
expectedNumberOfPartitions=" + table.partitions()
+                        + ", returnedAssignmentSize=" + (owningNodes == null ? 
"<null>" : owningNodes.size()) + "]");
+            }
 
             List<TokenizedAssignments> assignments;
 
@@ -1648,10 +1659,94 @@ public class TestBuilders {
 
             if (nullOrEmpty(nodes)) {
                 throw new SqlException(Sql.MAPPING_ERR, format("The view with 
name '{}' could not be found on"
-                                + " any active nodes in the cluster", view));
+                        + " any active nodes in the cluster", view));
             }
 
             return view.distribution() == IgniteDistributions.single() ? 
List.of(nodes.get(0)) : nodes;
         }
     }
+
+    private abstract static class AbstractScannableTable implements 
ScannableTable {
+        @Override
+        public <RowT> Publisher<RowT> scan(
+                ExecutionContext<RowT> ctx,
+                PartitionWithConsistencyToken partWithConsistencyToken,
+                RowFactory<RowT> rowFactory,
+                @Nullable BitSet requiredColumns
+        ) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT> 
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
+                RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, @Nullable RangeCondition<RowT> cond,
+                @Nullable BitSet requiredColumns) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx, 
PartitionWithConsistencyToken partWithConsistencyToken,
+                RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, RowT key, @Nullable BitSet requiredColumns) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <RowT> CompletableFuture<@Nullable RowT> 
primaryKeyLookup(ExecutionContext<RowT> ctx, InternalTransaction explicitTx,
+                RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet 
requiredColumns) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Long> estimatedSize() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Data provider that will be used in case no other provider was specified 
explicitly via
+     * {@link TestCluster#setDataProvider(String, ScannableTable)}.
+     */
+    @FunctionalInterface
+    public interface DefaultDataProvider {
+        ScannableTable get(String tableName);
+    }
+
+    /**
+     * Assignments provider that will be used in case no other provider was 
specified explicitly via
+     * {@link TestCluster#setAssignmentsProvider(String, AssignmentsProvider)}.
+     */
+    @FunctionalInterface
+    public interface DefaultAssignmentsProvider {
+        AssignmentsProvider get(String tableName);
+    }
+
+    /** Provider of assignments for a table. */
+    @FunctionalInterface
+    public interface AssignmentsProvider {
+        /**
+         * Returns the list of assignments.
+         *
+         * <p>Returned list must have the same number of elements as provided 
{@code partitionsCount}. If {@code includeBackups} is set to
+         * {@code true}, then every sublist is allowed to have more than one 
element.
+         *
+         * @param partitionsCount Number of partitions in a table.
+         * @param includeBackups Whether to include backup assignments or node.
+         * @return List of assignments.
+         */
+        List<List<String>> get(int partitionsCount, boolean includeBackups);
+    }
+
+    private static <T> @Nullable T resolveProvider(
+            String tableName,
+            Map<String, T> providersByTableName,
+            @Nullable Function<String, T> defaultProvider
+    ) {
+        T provider = providersByTableName.get(tableName);
+
+        if (provider == null && defaultProvider != null) {
+            return defaultProvider.apply(tableName);
+        }
+
+        return provider;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
index 86497b6170..028c0d0ab8 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.catalog.CatalogManager;
@@ -30,6 +31,8 @@ import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.AssignmentsProvider;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.util.IgniteUtils;
 
@@ -46,14 +49,20 @@ public class TestCluster implements LifecycleAware {
     private final List<LifecycleAware> components;
     private final Runnable initClosure;
     private final CatalogManager catalogManager;
+    private final ConcurrentMap<String, ScannableTable> 
dataProvidersByTableName;
+    private final ConcurrentMap<String, AssignmentsProvider> 
assignmentsProvidersByTableName;
 
     TestCluster(
+            ConcurrentMap<String, ScannableTable> dataProvidersByTableName,
+            ConcurrentMap<String, AssignmentsProvider> 
assignmentsProvidersByTableName,
             Map<String, TestNode> nodeByName,
             CatalogManager catalogManager,
             PrepareService prepareService,
             ClockWaiter clockWaiter,
             Runnable initClosure
     ) {
+        this.dataProvidersByTableName = dataProvidersByTableName;
+        this.assignmentsProvidersByTableName = assignmentsProvidersByTableName;
         this.nodeByName = nodeByName;
         this.components = List.of(
                 new ComponentToLifecycleAwareAdaptor(catalogManager),
@@ -100,6 +109,14 @@ public class TestCluster implements LifecycleAware {
         IgniteUtils.closeAll(closeables);
     }
 
+    public void setAssignmentsProvider(String tableName, AssignmentsProvider 
assignmentsProvider) {
+        assignmentsProvidersByTableName.put(tableName, assignmentsProvider);
+    }
+
+    public void setDataProvider(String tableName, ScannableTable dataProvider) 
{
+        dataProvidersByTableName.put(tableName, dataProvider);
+    }
+
     private static class ComponentToLifecycleAwareAdaptor implements 
LifecycleAware {
         private final IgniteComponent component;
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
index 2e4859a49d..fd508f99c1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.framework;
 
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.convertSqlRows;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
@@ -27,6 +26,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -37,6 +37,8 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
@@ -48,11 +50,11 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.prepare.KeyValueGetPlan;
 import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.sql.engine.prepare.SelectCountPlan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
+import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.systemview.api.SystemViews;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -61,6 +63,7 @@ import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.util.SubscriptionUtils;
 import org.apache.ignite.internal.util.subscription.TransformingPublisher;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -139,11 +142,11 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
                     .addColumn("ID", Collation.ASC_NULLS_FIRST)
                     .end()
                 .end()
-            .dataProvider("N1", "T1", table)
-            .dataProvider("N2", "T1", table)
-            // table T2 will be created later by DDL
-            .dataProvider("N1", "T2", table)
-            .dataProvider("N2", "T2", table)
+            .defaultAssignmentsProvider(tableName -> (partitionsCount, 
includeBackups) -> IntStream.range(0, partitionsCount)
+                    .mapToObj(part -> List.of(part % 2 == 0 ? "N1" : "N2"))
+                    .collect(Collectors.toList())
+            )
+            .defaultDataProvider(tableName -> table)
             // Register system views
             .addSystemView(SystemViews.<Object[]>clusterViewBuilder()
                     .name("NODES")
@@ -175,37 +178,41 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
     public void testSimpleQuery() {
         cluster.start();
 
-        var gatewayNode = cluster.node("N1");
-        var plan = gatewayNode.prepare("SELECT * FROM t1");
+        TestNode gatewayNode = cluster.node("N1");
+        String query = "SELECT * FROM t1";
 
-        for (var row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
-            assertNotNull(row);
-        }
+        QueryPlan plan = gatewayNode.prepare(query);
 
         // Ensure the plan contains full table scan.
-        assertTrue(plan instanceof MultiStepPlan);
-        assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof 
IgniteTableScan);
+        assertInstanceOf(MultiStepPlan.class, plan);
+        assertInstanceOf(IgniteTableScan.class, lastNode(((MultiStepPlan) 
plan).root()));
+
+        for (var row : 
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
+            assertNotNull(row);
+        }
     }
 
     @Test
     public void testSimpleFromCreatedTableByDdl() {
         cluster.start();
 
-        var gatewayNode = cluster.node("N1");
+        TestNode gatewayNode = cluster.node("N1");
 
         gatewayNode.initSchema(
                 "CREATE TABLE t2 (id INT PRIMARY KEY, val VARCHAR(64))"
         );
 
-        QueryPlan plan = gatewayNode.prepare("SELECT * FROM t2");
+        String query = "SELECT * FROM t2";
 
-        for (var row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+        QueryPlan plan = gatewayNode.prepare(query);
+
+        for (var row : 
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
             assertNotNull(row);
         }
 
         // Ensure the plan contains full table scan.
-        assertTrue(plan instanceof MultiStepPlan);
-        assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof 
IgniteTableScan);
+        assertInstanceOf(MultiStepPlan.class, plan);
+        assertInstanceOf(IgniteTableScan.class, lastNode(((MultiStepPlan) 
plan).root()));
     }
 
     @Test
@@ -213,14 +220,16 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
         cluster.start();
 
         TestNode gatewayNode = cluster.node("N1");
-        QueryPlan plan = gatewayNode.prepare("SELECT val, 100 FROM t1 WHERE ID 
= 1");
+        String query = "SELECT val, 100 FROM t1 WHERE ID = 1";
+
+        QueryPlan plan = gatewayNode.prepare(query);
 
-        for (InternalSqlRow row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+        for (InternalSqlRow row : 
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
             assertNotNull(row);
         }
 
         // Ensure the plan uses index.
-        assertTrue(plan instanceof KeyValueGetPlan);
+        assertInstanceOf(KeyValueGetPlan.class, plan);
     }
 
     @Test
@@ -228,15 +237,17 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
         cluster.start();
 
         TestNode gatewayNode = cluster.node("N1");
-        QueryPlan plan = gatewayNode.prepare("SELECT * FROM t1 WHERE ID > 1");
+        String query = "SELECT * FROM t1 WHERE ID > 1";
 
-        for (InternalSqlRow row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+        QueryPlan plan = gatewayNode.prepare(query);
+
+        for (InternalSqlRow row : 
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
             assertNotNull(row);
         }
 
         // Ensure the plan uses index.
-        assertTrue(plan instanceof MultiStepPlan);
-        assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof 
IgniteIndexScan);
+        assertInstanceOf(MultiStepPlan.class, plan);
+        assertInstanceOf(IgniteIndexScan.class, lastNode(((MultiStepPlan) 
plan).root()));
         assertEquals("SORTED_IDX", ((IgniteIndexScan) 
lastNode(((MultiStepPlan) plan).root())).indexName());
     }
 
@@ -249,9 +260,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest 
{
 
         TestNode stoppedNode = cluster.node("N2");
 
-        QueryPlan plan = gatewayNode.prepare("SELECT * FROM t1 WHERE ID > 1");
-
-        AsyncCursor<InternalSqlRow> cur = gatewayNode.executePlan(plan);
+        AsyncCursor<InternalSqlRow> cur = gatewayNode.executeQuery("SELECT * 
FROM t1 WHERE ID > 1");
 
         await(cur.requestNextAsync(1));
 
@@ -284,9 +293,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest 
{
 
         assertTrue(initiator.clockService().after(initiatorClock.now(), 
otherNodeClock.now()));
 
-        QueryPlan plan = initiator.prepare("SELECT * FROM t1");
-
-        AsyncCursor<InternalSqlRow> cur = initiator.executePlan(plan);
+        AsyncCursor<InternalSqlRow> cur = initiator.executeQuery("SELECT * 
FROM t1");
 
         await(cur.requestNextAsync(1));
 
@@ -310,9 +317,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest 
{
 
         assertTrue(otherNode.clockService().after(otherNodeClock.now(), 
initiatorClock.now()));
 
-        QueryPlan plan = initiator.prepare("SELECT * FROM t1");
-
-        AsyncCursor<InternalSqlRow> cur = initiator.executePlan(plan);
+        AsyncCursor<InternalSqlRow> cur = initiator.executeQuery("SELECT * 
FROM t1");
 
         await(cur.requestNextAsync(10_000));
 
@@ -334,9 +339,11 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
         cluster.start();
 
         TestNode gatewayNode = cluster.node("N1");
-        QueryPlan plan = gatewayNode.prepare("SELECT * FROM SYSTEM.NODES, 
SYSTEM.NODE_N2");
 
-        BatchedResult<InternalSqlRow> results = 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000));
+        BatchedResult<InternalSqlRow> results = await(
+                gatewayNode.executeQuery("SELECT * FROM SYSTEM.NODES, 
SYSTEM.NODE_N2")
+                        .requestNextAsync(10_000)
+        );
         List<List<Object>> rows = convertSqlRows(results.items());
 
         assertEquals(List.of(List.of(42L, "mango", "N2", 42)), rows);
@@ -360,8 +367,10 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
 
         TestNode gatewayNode = cluster.node("N1");
 
-        SelectCountPlan plan = (SelectCountPlan) gatewayNode.prepare("SELECT 
'hello', COUNT(*) FROM t1");
-        BatchedResult<InternalSqlRow> results = 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000));
+        BatchedResult<InternalSqlRow> results = await(
+                gatewayNode.executeQuery("SELECT 'hello', COUNT(*) FROM t1")
+                        .requestNextAsync(10_000)
+        );
 
         List<List<Object>> rows = convertSqlRows(results.items());
         assertEquals(List.of(List.of("hello", 42L)), rows);
@@ -374,8 +383,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest 
{
         TestNode node = cluster.node("N1");
         Object[] params = {1, null};
 
-        QueryPlan plan = node.prepare("SELECT ?, ?", params);
-        AsyncDataCursor<InternalSqlRow> cursor = node.executePlan(plan, 
params);
+        AsyncDataCursor<InternalSqlRow> cursor = node.executeQuery("SELECT ?, 
?", params);
         BatchedResult<InternalSqlRow> res = await(cursor.requestNextAsync(1));
 
         assertThat(res.hasMore(), is(false));
@@ -391,15 +399,10 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
 
         TestNode node = cluster.node("N1");
 
-        QueryPlan plan = node.prepare("SELECT ?/?", 1, 0);
-
-        AsyncDataCursor<InternalSqlRow> cursor = node.executePlan(plan, 1);
-
-        //noinspection ThrowableNotThrown
-        assertThrowsWithCause(
-                () -> await(cursor.requestNextAsync(1)),
-                IllegalStateException.class,
-                "Missing dynamic parameter: ?1"
+        SqlTestUtils.assertThrowsSqlException(
+                Sql.STMT_VALIDATION_ERR,
+                "Unexpected number of query parameters. Provided 1 but there 
is only 2 dynamic parameter(s)",
+                () -> node.executeQuery("SELECT ?/?", 1)
         );
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index fd945aa3b6..db4465535a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -25,7 +25,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.handlers.AbstractFailureHandler;
@@ -39,13 +45,12 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
@@ -61,21 +66,24 @@ import 
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
+import org.apache.ignite.internal.sql.engine.exec.fsm.QueryExecutor;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.StringUtils;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -85,10 +93,11 @@ import org.jetbrains.annotations.Nullable;
  */
 public class TestNode implements LifecycleAware {
     private final String nodeName;
+    private final QueryExecutor queryExecutor;
     private final PrepareService prepareService;
-    private final ExecutionService executionService;
     private final ParserService parserService;
     private final MessageService messageService;
+    private final ClusterService clusterService;
 
     private final List<LifecycleAware> services = new ArrayList<>();
     volatile boolean exceptionRaised;
@@ -105,6 +114,7 @@ public class TestNode implements LifecycleAware {
      */
     TestNode(
             String nodeName,
+            CatalogService catalogService,
             ClusterService clusterService,
             ParserService parserService,
             PrepareService prepareService,
@@ -117,6 +127,7 @@ public class TestNode implements LifecycleAware {
         this.nodeName = nodeName;
         this.parserService = parserService;
         this.prepareService = prepareService;
+        this.clusterService = clusterService;
 
         TopologyService topologyService = clusterService.topologyService();
         MessagingService messagingService = clusterService.messagingService();
@@ -150,7 +161,7 @@ public class TestNode implements LifecycleAware {
 
         TableFunctionRegistryImpl tableFunctionRegistry = new 
TableFunctionRegistryImpl();
 
-        executionService = registerService(ExecutionServiceImpl.create(
+        ExecutionService executionService = 
registerService(ExecutionServiceImpl.create(
                 topologyService,
                 messageService,
                 schemaManager,
@@ -168,6 +179,33 @@ public class TestNode implements LifecycleAware {
         ));
 
         registerService(new 
IgniteComponentLifecycleAwareAdapter(systemViewManager));
+
+        ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+
+        registerService(new LifecycleAware() {
+            @Override
+            public void start() { }
+
+            @Override
+            public void stop() {
+                scheduler.shutdownNow();
+            }
+        });
+
+        queryExecutor = registerService(new QueryExecutor(
+                EmptyCacheFactory.INSTANCE,
+                0,
+                parserService,
+                taskExecutor,
+                scheduler,
+                clockService,
+                new AlwaysSyncedSchemaSyncService(),
+                prepareService,
+                catalogService,
+                executionService,
+                SqlQueryProcessor.DEFAULT_PROPERTIES,
+                NoOpTransactionTracker.INSTANCE
+        ));
     }
 
     /** {@inheritDoc} */
@@ -181,6 +219,8 @@ public class TestNode implements LifecycleAware {
     public void stop() throws Exception {
         holdLock.block();
 
+        clusterService.stopAsync(new ComponentContext()).join();
+
         List<AutoCloseable> closeables = services.stream()
                 .map(service -> ((AutoCloseable) service::stop))
                 .collect(Collectors.toList());
@@ -210,33 +250,6 @@ public class TestNode implements LifecycleAware {
         return clockService;
     }
 
-    /**
-     * Executes given plan on a cluster this node belongs to
-     * and returns an async cursor representing the result.
-     *
-     * @param plan A plan to execute.
-     * @param transaction External transaction.
-     * @param params Query parameters.
-     * @return A cursor representing the result.
-     */
-    public AsyncDataCursor<InternalSqlRow> executePlan(QueryPlan plan, 
@Nullable InternalTransaction transaction, Object... params) {
-        SqlOperationContext ctx = createContext(transaction, params).build();
-
-        return executionService.executePlan(plan, ctx);
-    }
-
-    /**
-     * Executes given plan on a cluster this node belongs to
-     * and returns an async cursor representing the result.
-     *
-     * @param plan A plan to execute.
-     * @param params Query parameters.
-     * @return A cursor representing the result.
-     */
-    public AsyncDataCursor<InternalSqlRow> executePlan(QueryPlan plan, 
Object... params) {
-        return executePlan(plan, null, params);
-    }
-
     /**
      * Prepares (aka parses, validates, and optimizes) the given query string
      * and returns the plan to execute.
@@ -280,45 +293,56 @@ public class TestNode implements LifecycleAware {
         return await(prepareService.prepareAsync(parsedResult, ctx));
     }
 
-    /**
-     * Executes the given script.
-     *
-     * <p>This method splits given string by semicolon and execute every 
statement
-     * one by one. Technically it may execute SELECT statements as well, but 
since
-     * it returns nothing, it doesn't make any sense.
-     *
-     * @param script Script to execute.
-     */
+    /** Executes the given script. */
     public void initSchema(String script) {
-        for (String statement : script.split(";")) {
-            if (StringUtils.nullOrBlank(statement) || 
statement.trim().startsWith("--")) {
-                continue;
-            }
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = 
queryExecutor.executeQuery(
+                SqlPropertiesHelper.emptyProperties(),
+                ImplicitTxContext.INSTANCE,
+                script,
+                null,
+                ArrayUtils.OBJECT_EMPTY_ARRAY
+        );
 
-            ParsedResult parsedResult = parserService.parse(statement);
-            SqlOperationContext ctx = createContext().build();
+        var consumer = new Function<AsyncSqlCursor<?>, 
CompletionStage<AsyncSqlCursor<?>>>() {
+            @Override
+            public CompletionStage<AsyncSqlCursor<?>> apply(AsyncSqlCursor<?> 
cursor) {
+                CompletableFuture<Void> closeFuture = cursor.closeAsync();
 
-            QueryPlan plan = await(prepareService.prepareAsync(parsedResult, 
ctx));
+                if (cursor.hasNextResult()) {
+                    return cursor.nextResult().thenCompose(this);
+                }
 
-            if (plan.type() != SqlQueryType.DDL && plan.type() != 
SqlQueryType.DML) {
-                continue;
+                return closeFuture.thenApply(none -> cursor);
             }
+        };
 
-            AsyncCursor<?> cursor = executionService.executePlan(plan, ctx);
+        await(cursorFuture.thenCompose(consumer));
+    }
 
-            await(cursor.requestNextAsync(1));
-        }
+    /** Executes the given query. */
+    public AsyncSqlCursor<InternalSqlRow> executeQuery(@Nullable 
InternalTransaction tx, String query, Object... params) {
+        QueryTransactionContext txContext = tx == null ? 
ImplicitTxContext.INSTANCE : ExplicitTxContext.fromTx(tx);
+
+        return executeQuery(txContext, query, params);
     }
 
-    private SqlOperationContext.Builder createContext() {
-        return createContext(ImplicitTxContext.INSTANCE);
+    /** Executes the given query. */
+    public AsyncSqlCursor<InternalSqlRow> executeQuery(QueryTransactionContext 
txContext, String query, Object... params) {
+        return await(queryExecutor.executeQuery(
+                SqlPropertiesHelper.emptyProperties(),
+                txContext,
+                query,
+                null,
+                params
+        ));
     }
 
-    private SqlOperationContext.Builder createContext(@Nullable 
InternalTransaction tx, Object... params) {
-        return createContext(tx == null ? ImplicitTxContext.INSTANCE : 
ExplicitTxContext.fromTx(tx), params);
+    /** Executes the given query. */
+    public AsyncSqlCursor<InternalSqlRow> executeQuery(String query, Object... 
params) {
+        return executeQuery((InternalTransaction) null, query, params);
     }
 
-    private SqlOperationContext.Builder createContext(QueryTransactionContext 
txContext, Object... params) {
+    private SqlOperationContext.Builder createContext() {
         UUID queryId = UUID.randomUUID();
 
         return SqlOperationContext.builder()
@@ -327,8 +351,8 @@ public class TestNode implements LifecycleAware {
                 .operationTime(clock.now())
                 .defaultSchemaName(SqlCommon.DEFAULT_SCHEMA_NAME)
                 .timeZoneId(SqlQueryProcessor.DEFAULT_TIME_ZONE_ID)
-                .txContext(txContext)
-                .parameters(params)
+                .txContext(ImplicitTxContext.INSTANCE)
+                .parameters()
                 .prefetchCallback(new PrefetchCallback());
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index 75d4a8ddbd..625dbaad41 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -24,13 +24,12 @@ import static org.hamcrest.Matchers.containsString;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.TestCluster;
@@ -42,7 +41,6 @@ import 
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnType;
@@ -64,23 +62,25 @@ public class QueryCheckerTest extends 
BaseIgniteAbstractTest {
     @InjectQueryCheckerFactory
     private static QueryCheckerFactory queryCheckerFactory;
 
-    // @formatter:off
     private static final TestCluster CLUSTER = TestBuilders.cluster()
             .nodes(NODE_NAME)
-            .addTable()
-                    .name("T1")
-                    .addKeyColumn("ID", NativeTypes.INT32)
-                    .addColumn("VAL", NativeTypes.INT32)
-                    .end()
-            .dataProvider(NODE_NAME, "T1", 
TestBuilders.tableScan(DataProvider.fromCollection(List.of(
-                    new Object[] {1, 1, 1}, new Object[] {2, 2, 1}
-            ))))
             .build();
-    // @formatter:on
 
     @BeforeAll
     static void startCluster() {
         CLUSTER.start();
+
+        //noinspection ConcatenationWithEmptyString
+        CLUSTER.node("N1").initSchema(""
+                + "CREATE ZONE test_zone WITH partitions=1, 
storage_profiles='Default';"
+                + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE 
test_zone");
+
+        CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) -> 
IntStream.range(0, partitionCount)
+                .mapToObj(i -> List.of("N1"))
+                .collect(Collectors.toList()));
+        CLUSTER.setDataProvider("T1", 
TestBuilders.tableScan(DataProvider.fromCollection(
+                List.of(new Object[]{1, 1, 1}, new Object[]{2, 2, 1})
+        )));
     }
 
     @AfterAll
@@ -331,19 +331,7 @@ public class QueryCheckerTest extends 
BaseIgniteAbstractTest {
             assert params == null || params.length == 0 : "params are not 
supported";
             assert !prepareOnly : "Expected that the query will only be 
prepared, but not executed";
 
-            QueryPlan plan = node.prepare(qry);
-            AsyncDataCursor<InternalSqlRow> dataCursor = 
node.executePlan(plan);
-
-            SqlQueryType type = plan.type();
-
-            assert type != null;
-
-            AsyncSqlCursor<InternalSqlRow> sqlCursor = new 
AsyncSqlCursorImpl<>(
-                    type,
-                    plan.metadata(),
-                    dataCursor,
-                    null
-            );
+            AsyncSqlCursor<InternalSqlRow> sqlCursor = node.executeQuery(qry);
 
             return CompletableFuture.completedFuture(sqlCursor);
         }

Reply via email to