This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d7643f944 IGNITE-20714 Sql. Test Framework. Support DDL scripts to 
initialise schema (#2752)
8d7643f944 is described below

commit 8d7643f9448fb341d488e3081babe465c5b39b7d
Author: korlov42 <[email protected]>
AuthorDate: Thu Oct 26 17:14:33 2023 +0300

    IGNITE-20714 Sql. Test Framework. Support DDL scripts to initialise schema 
(#2752)
---
 .../internal/catalog/CatalogManagerImpl.java       |   7 +-
 .../internal/sql/engine/prepare/PlannerPhase.java  |   3 -
 .../RepeatedRandomRowDataProviderFactory.java      |  45 --
 .../sql/engine/benchmarks/SqlBenchmark.java        |   9 +-
 .../{TpchQueries.java => TpchHelper.java}          |  14 +-
 .../sql/engine/benchmarks/TpchParseBenchmark.java  |   4 +-
 .../engine/benchmarks/TpchPrepareBenchmark.java    |  14 +-
 .../internal/sql/engine/benchmarks/TpchSchema.java | 165 -----
 .../sql/engine/framework/TestBuilders.java         | 739 ++++++++++++++-------
 .../internal/sql/engine/framework/TestCluster.java |  23 +-
 .../sql/engine/framework/TestClusterTest.java      | 130 +++-
 .../internal/sql/engine/framework/TestIndex.java   |  42 +-
 .../internal/sql/engine/framework/TestNode.java    | 102 ++-
 .../internal/sql/engine/util/QueryCheckerTest.java |   8 +-
 .../test/resources/tpch/schema_definition_ddl.sql  | 125 ++++
 15 files changed, 858 insertions(+), 572 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 87bfafb5f1..9b475644d5 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -26,6 +26,7 @@ import static 
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.va
 import static 
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateRenameZoneParams;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParams;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParamsAndPreviousValue;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -317,7 +318,11 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
     }
 
     @Override
-    public CompletableFuture<Void> execute(List<CatalogCommand> commands) 
throws IllegalArgumentException {
+    public CompletableFuture<Void> execute(List<CatalogCommand> commands) {
+        if (nullOrEmpty(commands)) {
+            return completedFuture(null);
+        }
+
         return saveUpdateAndWaitForActivation(new 
BulkUpdateProducer(List.copyOf(commands)));
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index adda00bccb..e40d976799 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -126,9 +126,6 @@ public enum PlannerPhase {
             FilterMergeRule.Config.DEFAULT
                     .withOperandFor(LogicalFilter.class).toRule(),
 
-            JoinPushThroughJoinRule.Config.LEFT
-                    .withOperandFor(LogicalJoin.class).toRule(),
-
             JoinPushThroughJoinRule.Config.RIGHT
                     .withOperandFor(LogicalJoin.class).toRule(),
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/RepeatedRandomRowDataProviderFactory.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/RepeatedRandomRowDataProviderFactory.java
deleted file mode 100644
index 40bba39604..0000000000
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/RepeatedRandomRowDataProviderFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.benchmarks;
-
-import java.util.List;
-import org.apache.ignite.internal.sql.engine.framework.DataProvider;
-import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.DataProviderFactory;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
-import org.apache.ignite.internal.sql.engine.util.NativeTypeValues;
-
-/**
- * {@link DataProviderFactory} that creates {@link DataProvider}s that 
generates a row of pseudo random data based on table column types
- * and then returns the same row multiple times.
- */
-final class RepeatedRandomRowDataProviderFactory implements 
DataProviderFactory {
-
-    private final int dataSize;
-
-    RepeatedRandomRowDataProviderFactory(int dataSize) {
-        this.dataSize = dataSize;
-    }
-
-    /** {@inheritDoc} **/
-    @Override
-    public DataProvider<Object[]> createDataProvider(String tableName, 
List<ColumnDescriptor> columns) {
-        Object[] row = columns.stream().map(c -> NativeTypeValues.value(1, 
c.physicalType().spec())).toArray();
-
-        return DataProvider.fromRow(row, dataSize);
-    }
-}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
index ce13964d2b..10e0f410f1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.benchmarks;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
@@ -27,7 +26,6 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.TestCluster;
 import org.apache.ignite.internal.sql.engine.framework.TestNode;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -66,11 +64,12 @@ public class SqlBenchmark {
             .nodes("N1", "N2", "N3")
             .addTable()
                     .name("T1")
-                    .distribution(IgniteDistributions.hash(List.of(0)))
-                    .addColumn("ID", NativeTypes.INT32)
+                    .addKeyColumn("ID", NativeTypes.INT32)
                     .addColumn("VAL", NativeTypes.stringOf(64))
-                    .defaultDataProvider(dataProvider)
                     .end()
+            .dataProvider("N1", "T1", TestBuilders.tableScan(dataProvider))
+            .dataProvider("N2", "T1", TestBuilders.tableScan(dataProvider))
+            .dataProvider("N3", "T1", TestBuilders.tableScan(dataProvider))
             .build();
     // @formatter:on
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchQueries.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchHelper.java
similarity index 86%
rename from 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchQueries.java
rename to 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchHelper.java
index 1c5945fe00..9f58157bd7 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchQueries.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchHelper.java
@@ -27,9 +27,9 @@ import java.nio.charset.StandardCharsets;
 /**
  * Provides utility methods to work with queries defined by the TPC-H 
benchmark.
  */
-public final class TpchQueries {
+public final class TpchHelper {
 
-    private TpchQueries() {
+    private TpchHelper() {
 
     }
 
@@ -65,8 +65,16 @@ public final class TpchQueries {
         }
     }
 
+    /**
+     * Returns a string representing DDL script with all the tables' and 
indexes' definitions
+     * required execute queries from TPC-H suite.
+     */
+    public static String getSchemaDefinitionScript() {
+        return loadFromResource("tpch/schema_definition_ddl.sql");
+    }
+
     private static String loadFromResource(String resource) {
-        try (InputStream is = 
TpchQueries.class.getClassLoader().getResourceAsStream(resource)) {
+        try (InputStream is = 
TpchHelper.class.getClassLoader().getResourceAsStream(resource)) {
             if (is == null) {
                 throw new IllegalArgumentException("Resource does not exist: " 
+ resource);
             }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
index 7392ed9074..545429a087 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
@@ -48,7 +48,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 public class TpchParseBenchmark {
 
     /**
-     * Identifiers of TPC-H queries. See {@link TpchQueries#getQuery(String)}.
+     * Identifiers of TPC-H queries. See {@link TpchHelper#getQuery(String)}.
      */
     @Param({
             "1", "2", "3", "4", "5", "6", "7", "8", "8v", "9", "10", "11", 
"12", "12v",
@@ -61,7 +61,7 @@ public class TpchParseBenchmark {
     /** Prepares the plan of the query. */
     @Setup
     public void setUp() {
-        queryString = TpchQueries.getQuery(queryId);
+        queryString = TpchHelper.getQuery(queryId);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
index 424f21ba7a..65c15eda31 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
@@ -48,12 +48,12 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
-@Fork(3)
+@Fork(1)
 @State(Scope.Benchmark)
 public class TpchPrepareBenchmark {
 
     /**
-     * Identifiers of TPC-H queries. See {@link TpchQueries#getQuery(String)}.
+     * Identifiers of TPC-H queries. See {@link TpchHelper#getQuery(String)}.
      */
     @Param({
             "1", "2", "3", "4", "5", "6", "7", "8", "8v", "9", "10", "11", 
"12", "12v",
@@ -70,15 +70,15 @@ public class TpchPrepareBenchmark {
     /** Starts the cluster and prepares the plan of the query. */
     @Setup
     public void setUp() {
-        var clusterBuilder = TestBuilders.cluster().nodes("N1");
-        TpchSchema.registerTables(clusterBuilder, 1, 10);
-
-        testCluster = clusterBuilder.build();
+        testCluster = TestBuilders.cluster().nodes("N1").build();
 
         testCluster.start();
+
         gatewayNode = testCluster.node("N1");
 
-        String query = TpchQueries.getQuery(queryId);
+        gatewayNode.initSchema(TpchHelper.getSchemaDefinitionScript());
+
+        String query = TpchHelper.getQuery(queryId);
         parsedResult = new ParserServiceImpl(0, 
EmptyCacheFactory.INSTANCE).parse(query);
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchSchema.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchSchema.java
deleted file mode 100644
index 2f8d3c788e..0000000000
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchSchema.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.benchmarks;
-
-import java.util.List;
-import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.ClusterBuilder;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
-import org.apache.ignite.internal.type.NativeTypes;
-
-/**
- * Provides utility methods to register tables described by the TPC-H 
benchmark in a {@link ClusterBuilder}.
- */
-public final class TpchSchema {
-
-    private static final int PART_SIZE = 200_000;
-    private static final int SUPPLIER_SIZE = 10_000;
-    // TPC-H 4.2.5.1 Table 3: Estimated Database Size
-    private static final int PARTSUPP_SIZE = 80 * SUPPLIER_SIZE;
-    private static final int ORDERS_SIZE = 1_500_000;
-    // TPC-H 4.2.5.1 Table 3: Estimated Database Size
-    private static final int LINEITEM_SIZE = 4 * ORDERS_SIZE;
-    private static final int CUSTOMER_SIZE = 150_000;
-    private static final int NATION_SIZE = 25;
-    private static final int REGION_SIZE = 5;
-
-    private TpchSchema() {
-
-    }
-
-    /**
-     * Registers tables from the TPC-H benchmark in the given {@link 
ClusterBuilder cluster builder} with the scaling factor of {@code 1}.
-     *
-     * @param clusterBuilder  A cluster builder.
-     * @param dataSize The number of rows data provider is going to produce 
for each table.
-     */
-    public static void registerTables(ClusterBuilder clusterBuilder, int 
dataSize) {
-        registerTables(clusterBuilder, 1, dataSize);
-    }
-
-    /**
-     * Registers tables from the TPC-H benchmark in the given cluster with the 
given scaling factor.
-     *
-     * @param clusterBuilder  A cluster builder.
-     * @param scalingFactor Scaling factor.
-     * @param dataSize The number of rows data provider is going to produce 
for each table.
-     */
-    public static void registerTables(ClusterBuilder clusterBuilder, int 
scalingFactor, int dataSize) {
-        // Register default data provider factory that is going to generate 
pseudo random data data.
-        clusterBuilder.defaultDataProviderFactory(new 
RepeatedRandomRowDataProviderFactory(dataSize));
-
-        clusterBuilder.addTable().name("PART")
-                .addColumn("P_PARTKEY", NativeTypes.INT64)
-                .addColumn("P_NAME", NativeTypes.stringOf(55))
-                .addColumn("P_MFGR", NativeTypes.stringOf(25))
-                .addColumn("P_BRAND", NativeTypes.stringOf(10))
-                .addColumn("P_TYPE", NativeTypes.stringOf(25))
-                .addColumn("P_SIZE", NativeTypes.INT32)
-                .addColumn("P_CONTAINER", NativeTypes.stringOf(10))
-                .addColumn("P_RETAILPRICE", NativeTypes.decimalOf(15, 2))
-                .addColumn("P_COMMENT", NativeTypes.stringOf(23))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(scalingFactor * PART_SIZE).end();
-
-        clusterBuilder.addTable().name("SUPPLIER")
-                .addColumn("S_SUPPKEY", NativeTypes.INT64)
-                .addColumn("S_NAME", NativeTypes.stringOf(25))
-                .addColumn("S_ADDRESS", NativeTypes.stringOf(40))
-                .addColumn("S_NATIONKEY", NativeTypes.INT64)
-                .addColumn("S_PHONE", NativeTypes.stringOf(15))
-                .addColumn("S_ACCTBAL", NativeTypes.decimalOf(15, 2))
-                .addColumn("S_COMMENT", NativeTypes.stringOf(101))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(scalingFactor * SUPPLIER_SIZE).end();
-
-        clusterBuilder.addTable().name("PARTSUPP")
-                .addColumn("PS_PARTKEY", NativeTypes.INT64)
-                .addColumn("PS_SUPPKEY", NativeTypes.INT64)
-                .addColumn("PS_AVAILQTY", NativeTypes.INT32)
-                .addColumn("PS_SUPPLYCOST", NativeTypes.decimalOf(15, 2))
-                .addColumn("PS_COMMENT", NativeTypes.stringOf(199))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(scalingFactor * PARTSUPP_SIZE).end();
-
-        clusterBuilder.addTable().name("CUSTOMER")
-                .addColumn("C_CUSTKEY", NativeTypes.INT64)
-                .addColumn("C_NAME", NativeTypes.stringOf(25))
-                .addColumn("C_ADDRESS", NativeTypes.stringOf(40))
-                .addColumn("C_NATIONKEY", NativeTypes.INT64)
-                .addColumn("C_PHONE", NativeTypes.stringOf(15))
-                .addColumn("C_ACCTBAL", NativeTypes.decimalOf(15, 2))
-                .addColumn("C_MKTSEGMENT", NativeTypes.stringOf(10))
-                .addColumn("C_COMMENT", NativeTypes.stringOf(117))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(scalingFactor * CUSTOMER_SIZE)
-                .end();
-
-        clusterBuilder.addTable().name("ORDERS")
-                .addColumn("O_ORDERKEY", NativeTypes.INT64)
-                .addColumn("O_CUSTKEY", NativeTypes.INT64)
-                .addColumn("O_ORDERSTATUS", NativeTypes.stringOf(1))
-                .addColumn("O_TOTALPRICE", NativeTypes.decimalOf(15, 2))
-                .addColumn("O_ORDERDATE", NativeTypes.datetime(6))
-                .addColumn("O_ORDERPRIORITY", NativeTypes.stringOf(15))
-                .addColumn("O_CLERK", NativeTypes.stringOf(15))
-                .addColumn("O_SHIPPRIORITY", NativeTypes.INT32)
-                .addColumn("O_COMMENT", NativeTypes.stringOf(79))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(scalingFactor * ORDERS_SIZE)
-                .end();
-
-        clusterBuilder.addTable().name("LINEITEM")
-                .addColumn("L_ORDERKEY", NativeTypes.INT64)
-                .addColumn("L_PARTKEY", NativeTypes.INT64)
-                .addColumn("L_SUPPKEY", NativeTypes.INT64)
-                .addColumn("L_LINENUMBER", NativeTypes.INT32)
-                .addColumn("L_QUANTITY", NativeTypes.decimalOf(15, 2))
-                .addColumn("L_EXTENDEDPRICE", NativeTypes.decimalOf(15, 2))
-                .addColumn("L_DISCOUNT", NativeTypes.decimalOf(15, 2))
-                .addColumn("L_TAX", NativeTypes.decimalOf(15, 2))
-                .addColumn("L_RETURNFLAG", NativeTypes.stringOf(1))
-                .addColumn("L_LINESTATUS", NativeTypes.stringOf(1))
-                .addColumn("L_SHIPDATE", NativeTypes.datetime(6))
-                .addColumn("L_COMMITDATE", NativeTypes.datetime(6))
-                .addColumn("L_RECEIPTDATE", NativeTypes.datetime(6))
-                .addColumn("L_SHIPINSTRUCT", NativeTypes.stringOf(25))
-                .addColumn("L_SHIPMODE", NativeTypes.stringOf(10))
-                .addColumn("L_COMMENT", NativeTypes.stringOf(44))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(scalingFactor * LINEITEM_SIZE)
-                .end();
-
-        clusterBuilder.addTable().name("NATION")
-                .addColumn("N_NATIONKEY", NativeTypes.INT64)
-                .addColumn("N_NAME", NativeTypes.stringOf(25))
-                .addColumn("N_REGIONKEY", NativeTypes.INT64)
-                .addColumn("N_COMMENT", NativeTypes.stringOf(152))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(NATION_SIZE)
-                .end();
-
-        clusterBuilder.addTable().name("REGION")
-                .addColumn("R_REGIONKEY", NativeTypes.INT64)
-                .addColumn("R_NAME", NativeTypes.stringOf(25))
-                .addColumn("N_COMMENT", NativeTypes.stringOf(152))
-                .distribution(IgniteDistributions.hash(List.of(0)))
-                .size(REGION_SIZE)
-                .end();
-
-    }
-}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 33652960f6..32dec0855e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -17,45 +17,88 @@
 
 package org.apache.ignite.internal.sql.engine.framework;
 
-import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.PLANNING_TIMEOUT;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
+import org.apache.ignite.internal.catalog.commands.CreateSortedIndexCommand;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
 import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
+import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
+import org.apache.ignite.internal.type.BitmaskNativeType;
+import org.apache.ignite.internal.type.DecimalNativeType;
 import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+import org.apache.ignite.internal.type.NumberNativeType;
+import org.apache.ignite.internal.type.TemporalNativeType;
+import org.apache.ignite.internal.type.VarlenNativeType;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.internal.util.subscription.TransformingPublisher;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
@@ -64,10 +107,6 @@ import org.jetbrains.annotations.Nullable;
  * A collection of builders to create test objects.
  */
 public class TestBuilders {
-
-    /** Schema version. */
-    public static final int SCHEMA_VERSION = -1;
-
     /** Returns a builder of the test cluster object. */
     public static ClusterBuilder cluster() {
         return new ClusterBuilderImpl();
@@ -88,6 +127,112 @@ public class TestBuilders {
         return new ClusterServiceFactory(nodes);
     }
 
+    /**
+     * Factory method to create {@link ScannableTable table} instance from 
given data provider with
+     * only implemented {@link ScannableTable#scan table scan}.
+     */
+    public static ScannableTable tableScan(DataProvider<Object[]> 
dataProvider) {
+        return new ScannableTable() {
+            @Override
+            public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, 
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+                    @Nullable BitSet requiredColumns) {
+
+                return new TransformingPublisher<>(
+                        SubscriptionUtils.fromIterable(
+                                () -> new TransformingIterator<>(
+                                        dataProvider.iterator(),
+                                        row -> project(row, requiredColumns)
+                                )
+                        ),
+                        rowFactory::create
+                );
+            }
+
+            @Override
+            public <RowT> Publisher<RowT> 
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, @Nullable RangeCondition<RowT> cond,
+                    @Nullable BitSet requiredColumns) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> 
ctx, PartitionWithTerm partWithTerm,
+                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, RowT key, @Nullable BitSet requiredColumns) {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    /**
+     * Factory method to create {@link ScannableTable table} instance from 
given data provider with
+     * only implemented {@link ScannableTable#indexRangeScan index range scan}.
+     */
+    public static ScannableTable indexRangeScan(DataProvider<Object[]> 
dataProvider) {
+        return new ScannableTable() {
+            @Override
+            public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, 
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+                    @Nullable BitSet requiredColumns) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public <RowT> Publisher<RowT> 
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, @Nullable RangeCondition<RowT> cond,
+                    @Nullable BitSet requiredColumns) {
+                return new TransformingPublisher<>(
+                        SubscriptionUtils.fromIterable(
+                                () -> new TransformingIterator<>(
+                                        dataProvider.iterator(),
+                                        row -> project(row, requiredColumns)
+                                )
+                        ),
+                        rowFactory::create
+                );
+            }
+
+            @Override
+            public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> 
ctx, PartitionWithTerm partWithTerm,
+                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, RowT key, @Nullable BitSet requiredColumns) {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    /**
+     * Factory method to create {@link ScannableTable table} instance from 
given data provider with
+     * only implemented {@link ScannableTable#indexLookup index lookup}.
+     */
+    public static ScannableTable indexLookup(DataProvider<Object[]> 
dataProvider) {
+        return new ScannableTable() {
+            @Override
+            public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, 
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+                    @Nullable BitSet requiredColumns) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public <RowT> Publisher<RowT> 
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, @Nullable RangeCondition<RowT> cond,
+                    @Nullable BitSet requiredColumns) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> 
ctx, PartitionWithTerm partWithTerm,
+                    RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, RowT key, @Nullable BitSet requiredColumns) {
+                return new TransformingPublisher<>(
+                        SubscriptionUtils.fromIterable(
+                                () -> new TransformingIterator<>(
+                                        dataProvider.iterator(),
+                                        row -> project(row, requiredColumns)
+                                )
+                        ),
+                        rowFactory::create
+                );
+            }
+        };
+    }
+
     /**
      * A builder to create a test cluster object.
      *
@@ -112,22 +257,21 @@ public class TestBuilders {
         ClusterTableBuilder addTable();
 
         /**
-         * When specified the given factory is used to create instances of
-         * {@link ClusterTableBuilder#defaultDataProvider(DataProvider) 
default data providers} for tables that have no
-         * {@link ClusterTableBuilder#defaultDataProvider(DataProvider) 
default data provider} set.
-         *
-         * <p>Note: when a table has default data provider this method has no 
effect.
+         * Builds the cluster object.
          *
-         * @return {@code this} for chaining.
+         * @return Created cluster object.
          */
-        ClusterBuilder defaultDataProviderFactory(DataProviderFactory 
dataProviderFactory);
+        TestCluster build();
 
         /**
-         * Builds the cluster object.
+         * Provides implementation of table with given name local per given 
node.
          *
-         * @return Created cluster object.
+         * @param nodeName Name of the node given instance of table will be 
assigned to.
+         * @param tableName Name of the table given instance represents.
+         * @param table Actual table that will be used for read operations 
during execution.
+         * @return {@code this} for chaining.
          */
-        TestCluster build();
+        ClusterBuilder dataProvider(String nodeName, String tableName, 
ScannableTable table);
     }
 
     /**
@@ -137,17 +281,23 @@ public class TestBuilders {
      */
     public interface TableBuilder extends TableBuilderBase<TableBuilder> {
         /** Returns a builder of the test sorted-index object. */
-        public SortedIndexBuilder sortedIndex();
+        SortedIndexBuilder sortedIndex();
 
         /** Returns a builder of the test hash-index object. */
-        public HashIndexBuilder hashIndex();
+        HashIndexBuilder hashIndex();
+
+        /** Sets the distribution of the table. */
+        TableBuilder distribution(IgniteDistribution distribution);
+
+        /** Sets the size of the table. */
+        TableBuilder size(int size);
 
         /**
          * Builds a table.
          *
          * @return Created table object.
          */
-        public TestTable build();
+        TestTable build();
     }
 
     /**
@@ -173,7 +323,6 @@ public class TestBuilders {
      * @see TestCluster
      */
     public interface ClusterTableBuilder extends 
TableBuilderBase<ClusterTableBuilder>,
-            DataSourceBuilder<ClusterTableBuilder>,
             NestedBuilder<ClusterBuilder> {
 
         /**
@@ -198,7 +347,6 @@ public class TestBuilders {
      * @see TestCluster
      */
     public interface ClusterSortedIndexBuilder extends 
SortedIndexBuilderBase<ClusterSortedIndexBuilder>,
-            DataSourceBuilder<ClusterSortedIndexBuilder>,
             NestedBuilder<ClusterTableBuilder> {
     }
 
@@ -209,25 +357,9 @@ public class TestBuilders {
      * @see TestCluster
      */
     public interface ClusterHashIndexBuilder extends 
HashIndexBuilderBase<ClusterHashIndexBuilder>,
-            DataSourceBuilder<ClusterHashIndexBuilder>,
             NestedBuilder<ClusterTableBuilder> {
     }
 
-    /**
-     * A builder interface to enrich a builder object with data-source related 
fields.
-     */
-    public interface DataSourceBuilder<ChildT> {
-        /**
-         * Adds a default data provider, which will be used for those nodes 
for which no specific provider is specified.
-         *
-         * <p>Note: this method will force all nodes in the cluster to have a 
data provider for the given object.
-         */
-        ChildT defaultDataProvider(DataProvider<?> dataProvider);
-
-        /** Adds a data provider for the given node to the data source object. 
*/
-        ChildT addDataProvider(String targetNode, DataProvider<?> 
dataProvider);
-    }
-
     /**
      * A builder to create an execution context.
      *
@@ -312,8 +444,8 @@ public class TestBuilders {
 
     private static class ClusterBuilderImpl implements ClusterBuilder {
         private final List<ClusterTableBuilderImpl> tableBuilders = new 
ArrayList<>();
-        private DataProviderFactory dataProviderFactory;
         private List<String> nodeNames;
+        private final Map<String, Map<String, ScannableTable>> 
nodeName2tableName2table = new HashMap<>();
 
         /** {@inheritDoc} */
         @Override
@@ -332,48 +464,57 @@ public class TestBuilders {
             return new ClusterTableBuilderImpl(this);
         }
 
-        /** {@inheritDoc} */
         @Override
-        public ClusterBuilder defaultDataProviderFactory(DataProviderFactory 
dataProviderFactory) {
-            this.dataProviderFactory = dataProviderFactory;
+        public ClusterBuilder dataProvider(String nodeName, String tableName, 
ScannableTable table) {
+            nodeName2tableName2table.computeIfAbsent(nodeName, key -> new 
HashMap<>()).put(tableName, table);
+
             return this;
         }
 
         /** {@inheritDoc} */
         @Override
         public TestCluster build() {
+            validateConfiguredDataProviders();
+
             var clusterService = new ClusterServiceFactory(nodeNames);
 
-            Map<String, Map<String, DataProvider<?>>> dataProvidersByTableName 
= new HashMap<>();
-            for (ClusterTableBuilderImpl tableBuilder : tableBuilders) {
-                validateDataSourceBuilder(tableBuilder);
-                injectDefaultDataProvidersIfNeeded(tableBuilder);
-                injectDataProvidersIfNeeded(tableBuilder);
+            var clusterName = "test_cluster";
 
-                for (AbstractIndexBuilderImpl<?> indexBuilder : 
tableBuilder.indexBuilders) {
-                    validateDataSourceBuilder(indexBuilder);
-                    injectDataProvidersIfNeeded(indexBuilder);
-                }
+            CatalogManager catalogManager = 
CatalogTestUtils.createCatalogManagerWithTestUpdateLog(clusterName, new 
HybridClockImpl());
+
+            var parserService = new ParserServiceImpl(0, 
EmptyCacheFactory.INSTANCE);
+            var prepareService = new PrepareServiceImpl(clusterName, 0, 
CaffeineCacheFactory.INSTANCE,
+                    new DdlSqlToCommandConverter(Map.of(), () -> "aipersist"), 
PLANNING_TIMEOUT, mock(MetricManager.class));
 
-                dataProvidersByTableName.put(tableBuilder.name, 
tableBuilder.dataProviders);
+            Map<String, List<String>> owningNodesByTableName = new HashMap<>();
+            for (Entry<String, Map<String, ScannableTable>> entry : 
nodeName2tableName2table.entrySet()) {
+                for (String tableName : entry.getValue().keySet()) {
+                    owningNodesByTableName.computeIfAbsent(tableName, key -> 
new ArrayList<>()).add(entry.getKey());
+                }
             }
 
-            Map<String, IgniteTable> tableByName = tableBuilders.stream()
-                    .map(ClusterTableBuilderImpl::build)
-                    .collect(Collectors.toMap(TestTable::name, 
Function.identity()));
+            List<CatalogCommand> initialSchema = tableBuilders.stream()
+                    .flatMap(builder -> builder.build().stream())
+                    .collect(Collectors.toList());
+
+            Runnable initClosure = () -> 
await(catalogManager.execute(initialSchema));
 
-            IgniteSchema schema = new IgniteSchema(DEFAULT_SCHEMA_NAME, 
SCHEMA_VERSION, tableByName.values());
-            var schemaManager = new PredefinedSchemaManager(schema);
+            var ddlHandler = new DdlCommandHandler(catalogManager);
+            var schemaManager = new SqlSchemaManagerImpl(catalogManager, 
CaffeineCacheFactory.INSTANCE, 0);
             var targetProvider = new ExecutionTargetProvider() {
                 @Override
                 public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
-                    Map<String, DataProvider<?>> dataProviders = 
dataProvidersByTableName.get(table.name());
+                    List<String> owningNodes = 
owningNodesByTableName.get(table.name());
 
-                    if (nullOrEmpty(dataProviders)) {
+                    if (nullOrEmpty(owningNodes)) {
                         throw new AssertionError("DataProvider is not 
configured for table " + table.name());
                     }
 
-                    return 
CompletableFuture.completedFuture(factory.allOf(List.copyOf(dataProviders.keySet())));
+                    ExecutionTarget target = 
factory.partitioned(owningNodes.stream()
+                            .map(name -> new NodeWithTerm(name, 1))
+                            .collect(Collectors.toList()));
+
+                    return CompletableFuture.completedFuture(target);
                 }
 
                 @Override
@@ -393,47 +534,65 @@ public class TestBuilders {
                         mappingService.onTopologyLeap(new 
LogicalTopologySnapshot(1L, logicalNodes));
 
                         return new TestNode(
-                                name, clusterService.forNode(name), 
schemaManager, mappingService
+                                name,
+                                clusterService.forNode(name),
+                                parserService,
+                                prepareService,
+                                schemaManager,
+                                mappingService,
+                                new 
TestExecutableTableRegistry(nodeName2tableName2table.get(name), schemaManager),
+                                ddlHandler
                         );
                     })
                     .collect(Collectors.toMap(TestNode::name, 
Function.identity()));
 
-            return new TestCluster(nodes);
+            return new TestCluster(
+                    nodes,
+                    List.of(new 
ComponentToLifecycleAwareAdaptor(catalogManager), prepareService),
+                    initClosure
+            );
         }
 
-        private void 
validateDataSourceBuilder(AbstractDataSourceBuilderImpl<?> tableBuilder) {
-            Set<String> tableOwners = new 
HashSet<>(tableBuilder.dataProviders.keySet());
+        private void validateConfiguredDataProviders() {
+            Set<String> dataProvidersOwners = new 
HashSet<>(nodeName2tableName2table.keySet());
 
-            tableOwners.removeAll(nodeNames);
+            dataProvidersOwners.removeAll(Set.copyOf(nodeNames));
 
-            if (!tableOwners.isEmpty()) {
-                throw new AssertionError(format("The table has a dataProvider 
that is outside the cluster "
-                        + "[tableName={}, outsiders={}]", tableBuilder.name, 
tableOwners));
-            }
-        }
+            if (!dataProvidersOwners.isEmpty()) {
+                Map<String, List<String>> problematicTables = new HashMap<>();
+
+                for (String outsiderNode : dataProvidersOwners) {
+                    for (String problematicTable : 
nodeName2tableName2table.get(outsiderNode).keySet()) {
+                        problematicTables.computeIfAbsent(problematicTable, k 
-> new ArrayList<>()).add(outsiderNode);
+                    }
+                }
+
+                String problematicTablesString = 
problematicTables.entrySet().stream()
+                        .map(e -> e.getKey() + ": " + e.getValue())
+                        .collect(Collectors.joining(", "));
 
-        private void 
injectDefaultDataProvidersIfNeeded(ClusterTableBuilderImpl tableBuilder) {
-            if (tableBuilder.defaultDataProvider == null && 
dataProviderFactory != null) {
-                tableBuilder.defaultDataProvider = 
dataProviderFactory.createDataProvider(tableBuilder.name, tableBuilder.columns);
+                throw new AssertionError(format("The table has a dataProvider 
that is outside the cluster "
+                        + "[{}]", problematicTablesString));
             }
         }
+    }
 
-        private void 
injectDataProvidersIfNeeded(AbstractDataSourceBuilderImpl<?> builder) {
-            if (builder.defaultDataProvider == null) {
-                return;
-            }
+    private static class TableBuilderImpl implements TableBuilder {
+        private final List<AbstractTableIndexBuilderImpl<?>> indexBuilders = 
new ArrayList<>();
+        private final List<ColumnDescriptor> columns = new ArrayList<>();
 
-            Set<String> nodesWithoutDataProvider = new HashSet<>(nodeNames);
+        private String name;
+        private IgniteDistribution distribution;
+        private int size = 100_000;
 
-            nodesWithoutDataProvider.removeAll(builder.dataProviders.keySet());
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder name(String name) {
+            this.name = name;
 
-            for (String name : nodesWithoutDataProvider) {
-                builder.addDataProvider(name, builder.defaultDataProvider);
-            }
+            return this;
         }
-    }
 
-    private static class TableBuilderImpl extends 
AbstractTableBuilderImpl<TableBuilder> implements TableBuilder {
         /** {@inheritDoc} */
         @Override
         public SortedIndexBuilder sortedIndex() {
@@ -446,6 +605,63 @@ public class TestBuilders {
             return new HashIndexBuilderImpl(this);
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder distribution(IgniteDistribution distribution) {
+            this.distribution = distribution;
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder addColumn(String name, NativeType type, boolean 
nullable) {
+            columns.add(new ColumnDescriptorImpl(
+                    name, false, nullable, columns.size(), type, 
DefaultValueStrategy.DEFAULT_NULL, null
+            ));
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder addColumn(String name, NativeType type) {
+            return addColumn(name, type, true);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder addColumn(String name, NativeType type, @Nullable 
Object defaultValue) {
+            if (defaultValue == null) {
+                return addColumn(name, type);
+            } else {
+                ColumnDescriptorImpl desc = new ColumnDescriptorImpl(
+                        name, false, true, columns.size(), type, 
DefaultValueStrategy.DEFAULT_CONSTANT, () -> defaultValue
+                );
+                columns.add(desc);
+            }
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder addKeyColumn(String name, NativeType type) {
+            columns.add(new ColumnDescriptorImpl(
+                    name, true, false, columns.size(), type, 
DefaultValueStrategy.DEFAULT_NULL, null
+            ));
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder size(int size) {
+            this.size = size;
+
+            return this;
+        }
+
         /** {@inheritDoc} */
         @Override
         public TestTable build() {
@@ -474,21 +690,30 @@ public class TestBuilders {
                     indexes
             );
         }
-
-        /** {@inheritDoc} */
-        @Override
-        protected TableBuilder self() {
-            return this;
-        }
     }
 
-    private static class ClusterTableBuilderImpl extends 
AbstractTableBuilderImpl<ClusterTableBuilder> implements ClusterTableBuilder {
+    private static class ClusterTableBuilderImpl implements 
ClusterTableBuilder {
+        private final List<AbstractClusterTableIndexBuilderImpl<?>> 
indexBuilders = new ArrayList<>();
+
+        private final List<ColumnParams> columns = new ArrayList<>();
+        private final List<String> keyColumns = new ArrayList<>();
+
         private final ClusterBuilderImpl parent;
 
+        private String name;
+
         private ClusterTableBuilderImpl(ClusterBuilderImpl parent) {
             this.parent = parent;
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public ClusterTableBuilder name(String name) {
+            this.name = name;
+
+            return this;
+        }
+
         /** {@inheritDoc} */
         @Override
         public ClusterSortedIndexBuilder addSortedIndex() {
@@ -501,12 +726,33 @@ public class TestBuilders {
             return new ClusterHashIndexBuilderImpl(this);
         }
 
+        @Override
+        public ClusterTableBuilder addColumn(String name, NativeType type, 
boolean nullable) {
+            columns.add(columnParams(name, type, nullable, null));
+
+            return this;
+        }
+
         /** {@inheritDoc} */
         @Override
-        protected ClusterTableBuilder self() {
+        public ClusterTableBuilder addColumn(String name, NativeType type) {
+            return addColumn(name, type, true);
+        }
+
+        @Override
+        public ClusterTableBuilder addColumn(String name, NativeType type, 
@Nullable Object defaultValue) {
+            columns.add(columnParams(name, type, true, defaultValue));
+
             return this;
         }
 
+        @Override
+        public ClusterTableBuilder addKeyColumn(String name, NativeType type) {
+            keyColumns.add(name);
+
+            return addColumn(name, type, false);
+        }
+
         /** {@inheritDoc} */
         @Override
         public ClusterBuilder end() {
@@ -515,18 +761,27 @@ public class TestBuilders {
             return parent;
         }
 
-        private TestTable build() {
-            TableDescriptorImpl tableDescriptor = new 
TableDescriptorImpl(columns, distribution);
+        private List<CatalogCommand> build() {
+            List<CatalogCommand> commands = new ArrayList<>(1 + 
indexBuilders.size());
 
-            List<IgniteIndex> indexes = indexBuilders.stream()
-                    .map(idx -> idx.build(tableDescriptor))
-                    .collect(Collectors.toList());
+            commands.add(
+                    CreateTableCommand.builder()
+                            .schemaName("PUBLIC")
+                            .tableName(name)
+                            .columns(columns)
+                            .primaryKeyColumns(keyColumns)
+                            .build()
+            );
+
+            for (AbstractClusterTableIndexBuilderImpl<?> builder : 
indexBuilders) {
+                commands.add(builder.build("PUBLIC", name));
+            }
 
-            return new TestTable(tableDescriptor, name, size, indexes, 
dataProviders);
+            return commands;
         }
     }
 
-    private static class SortedIndexBuilderImpl extends 
AbstractIndexBuilderImpl<SortedIndexBuilder>
+    private static class SortedIndexBuilderImpl extends 
AbstractTableIndexBuilderImpl<SortedIndexBuilder>
             implements SortedIndexBuilder {
         private final TableBuilderImpl parent;
 
@@ -563,11 +818,11 @@ public class TestBuilders {
                 throw new IllegalArgumentException("Collation must be 
specified for each of columns.");
             }
 
-            return TestIndex.createSorted(name, columns, collations, desc, 
dataProviders);
+            return TestIndex.createSorted(name, columns, collations, desc);
         }
     }
 
-    private static class HashIndexBuilderImpl extends 
AbstractIndexBuilderImpl<HashIndexBuilder> implements HashIndexBuilder {
+    private static class HashIndexBuilderImpl extends 
AbstractTableIndexBuilderImpl<HashIndexBuilder> implements HashIndexBuilder {
         private final TableBuilderImpl parent;
 
         private HashIndexBuilderImpl(TableBuilderImpl parent) {
@@ -601,11 +856,11 @@ public class TestBuilders {
 
             assert collations == null : "Collation is not supported.";
 
-            return TestIndex.createHash(name, columns, desc, dataProviders);
+            return TestIndex.createHash(name, columns, desc);
         }
     }
 
-    private static class ClusterSortedIndexBuilderImpl extends 
AbstractIndexBuilderImpl<ClusterSortedIndexBuilder>
+    private static class ClusterSortedIndexBuilderImpl extends 
AbstractClusterTableIndexBuilderImpl<ClusterSortedIndexBuilder>
             implements ClusterSortedIndexBuilder {
         private final ClusterTableBuilderImpl parent;
 
@@ -628,14 +883,24 @@ public class TestBuilders {
         }
 
         @Override
-        TestIndex build(TableDescriptor desc) {
+        CatalogCommand build(String schemaName, String tableName) {
             assert collations.size() == columns.size();
 
-            return TestIndex.createSorted(name, columns, collations, desc, 
dataProviders);
+            List<CatalogColumnCollation> catalogCollations = 
collations.stream()
+                    .map(c -> CatalogColumnCollation.get(c.asc, c.nullsFirst))
+                    .collect(Collectors.toList());
+
+            return CreateSortedIndexCommand.builder()
+                    .schemaName(schemaName)
+                    .tableName(tableName)
+                    .indexName(name)
+                    .columns(columns)
+                    .collations(catalogCollations)
+                    .build();
         }
     }
 
-    private static class ClusterHashIndexBuilderImpl extends 
AbstractIndexBuilderImpl<ClusterHashIndexBuilder>
+    private static class ClusterHashIndexBuilderImpl extends 
AbstractClusterTableIndexBuilderImpl<ClusterHashIndexBuilder>
             implements ClusterHashIndexBuilder {
         private final ClusterTableBuilderImpl parent;
 
@@ -658,36 +923,20 @@ public class TestBuilders {
         }
 
         @Override
-        TestIndex build(TableDescriptor desc) {
-            assert collations == null;
-
-            return TestIndex.createHash(name, columns, desc, dataProviders);
+        CatalogCommand build(String schemaName, String tableName) {
+            return CreateHashIndexCommand.builder()
+                    .schemaName(schemaName)
+                    .tableName(tableName)
+                    .indexName(name)
+                    .columns(columns)
+                    .build();
         }
     }
 
-    /**
-     * A factory that creates {@link DataProvider data providers}.
-     */
-    @FunctionalInterface
-    public interface DataProviderFactory {
-
-        /**
-         * Creates a {@link DataProvider} for the given table.
-         *
-         * @param tableName a table name.
-         * @param columns a list of columns.
-         * @return an instance of {@link DataProvider}.
-         */
-        DataProvider<Object[]> createDataProvider(String tableName, 
List<ColumnDescriptor> columns);
-    }
-
-    private abstract static class AbstractTableBuilderImpl<ChildT> extends 
AbstractDataSourceBuilderImpl<ChildT>
-            implements TableBuilderBase<ChildT> {
-        protected final List<ColumnDescriptor> columns = new ArrayList<>();
-        protected final List<AbstractIndexBuilderImpl> indexBuilders = new 
ArrayList<>();
-
-        protected IgniteDistribution distribution;
-        protected int size = 100_000;
+    private abstract static class AbstractIndexBuilderImpl<ChildT> implements 
SortedIndexBuilderBase<ChildT>, HashIndexBuilderBase<ChildT> {
+        String name;
+        final List<String> columns = new ArrayList<>();
+        List<Collation> collations;
 
         /** {@inheritDoc} */
         @Override
@@ -697,68 +946,6 @@ public class TestBuilders {
             return self();
         }
 
-        /** {@inheritDoc} */
-        @Override
-        public ChildT distribution(IgniteDistribution distribution) {
-            this.distribution = distribution;
-
-            return self();
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public ChildT addKeyColumn(String name, NativeType type) {
-            columns.add(new ColumnDescriptorImpl(
-                    name, true, false, columns.size(), type, 
DefaultValueStrategy.DEFAULT_NULL, null
-            ));
-
-            return self();
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public ChildT addColumn(String name, NativeType type) {
-            return addColumn(name, type, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public ChildT addColumn(String name, NativeType type, boolean 
nullable) {
-            columns.add(new ColumnDescriptorImpl(
-                    name, false, nullable, columns.size(), type, 
DefaultValueStrategy.DEFAULT_NULL, null
-            ));
-
-            return self();
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public ChildT addColumn(String name, NativeType type, @Nullable Object 
defaultValue) {
-            if (defaultValue == null) {
-                return addColumn(name, type);
-            } else {
-                ColumnDescriptorImpl desc = new ColumnDescriptorImpl(
-                        name, false, true, columns.size(), type, 
DefaultValueStrategy.DEFAULT_CONSTANT, () -> defaultValue
-                );
-                columns.add(desc);
-            }
-            return self();
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public ChildT size(int size) {
-            this.size = size;
-
-            return self();
-        }
-    }
-
-    private abstract static class AbstractIndexBuilderImpl<ChildT> extends 
AbstractDataSourceBuilderImpl<ChildT>
-            implements SortedIndexBuilderBase<ChildT>, 
HashIndexBuilderBase<ChildT> {
-        protected final List<String> columns = new ArrayList<>();
-        protected List<Collation> collations;
-
         /** {@inheritDoc} */
         @Override
         public ChildT addColumn(String columnName) {
@@ -780,34 +967,15 @@ public class TestBuilders {
             return self();
         }
 
-        abstract TestIndex build(TableDescriptor desc);
-    }
-
-    private abstract static class AbstractDataSourceBuilderImpl<ChildT> {
-
-        protected String name;
-        final Map<String, DataProvider<?>> dataProviders = new HashMap<>();
-        DataProvider<?> defaultDataProvider = null;
-
         abstract ChildT self();
+    }
 
-        public ChildT name(String name) {
-            this.name = name;
-
-            return self();
-        }
-
-        public ChildT defaultDataProvider(DataProvider<?> dataProvider) {
-            this.defaultDataProvider = dataProvider;
-
-            return self();
-        }
-
-        public ChildT addDataProvider(String targetNode, DataProvider<?> 
dataProvider) {
-            this.dataProviders.put(targetNode, dataProvider);
+    private abstract static class AbstractTableIndexBuilderImpl<ChildT> 
extends AbstractIndexBuilderImpl<ChildT> {
+        abstract TestIndex build(TableDescriptor desc);
+    }
 
-            return self();
-        }
+    private abstract static class AbstractClusterTableIndexBuilderImpl<ChildT> 
extends AbstractIndexBuilderImpl<ChildT> {
+        abstract CatalogCommand build(String schemaName, String tableName);
     }
 
     /**
@@ -823,9 +991,6 @@ public class TestBuilders {
         /** Sets the name of the table. */
         ChildT name(String name);
 
-        /** Sets the distribution of the table. */
-        ChildT distribution(IgniteDistribution distribution);
-
         /** Adds a key column to the table. */
         ChildT addKeyColumn(String name, NativeType type);
 
@@ -837,9 +1002,6 @@ public class TestBuilders {
 
         /** Adds a column with the given default value to the table. */
         ChildT addColumn(String name, NativeType type, @Nullable Object 
defaultValue);
-
-        /** Sets the size of the table. */
-        ChildT size(int size);
     }
 
     /**
@@ -911,4 +1073,131 @@ public class TestBuilders {
          */
         ParentT end();
     }
+
+    private static class ComponentToLifecycleAwareAdaptor implements 
LifecycleAware {
+        private final IgniteComponent component;
+
+        ComponentToLifecycleAwareAdaptor(IgniteComponent component) {
+            this.component = component;
+        }
+
+        @Override
+        public void start() {
+            component.start();
+        }
+
+        @Override
+        public void stop() throws Exception {
+            component.stop();
+        }
+    }
+
+    private static class TestExecutableTableRegistry implements 
ExecutableTableRegistry {
+        private final Map<String, ScannableTable> tablesByName;
+        private final SqlSchemaManager schemaManager;
+
+        TestExecutableTableRegistry(Map<String, ScannableTable> tablesByName, 
SqlSchemaManager schemaManager) {
+            this.tablesByName = tablesByName;
+            this.schemaManager = schemaManager;
+        }
+
+        @Override
+        public CompletableFuture<ExecutableTable> getTable(int schemaVersion, 
int tableId) {
+            IgniteTable table = schemaManager.table(schemaVersion, tableId);
+
+            assert table != null;
+
+            return CompletableFuture.completedFuture(new ExecutableTable() {
+                @Override
+                public ScannableTable scannableTable() {
+                    ScannableTable scannableTable = 
tablesByName.get(table.name());
+
+                    assert scannableTable != null;
+
+                    return scannableTable;
+                }
+
+                @Override
+                public UpdatableTable updatableTable() {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public TableDescriptor tableDescriptor() {
+                    return table.descriptor();
+                }
+            });
+        }
+    }
+
+    private static ColumnParams columnParams(String name, NativeType type, 
boolean nullable, @Nullable Object defaultValue) {
+        NativeTypeSpec typeSpec = type.spec();
+
+        Builder builder = ColumnParams.builder()
+                .name(name)
+                .type(typeSpec.asColumnType())
+                .nullable(nullable)
+                .defaultValue(DefaultValue.constant(defaultValue));
+
+        switch (typeSpec) {
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case UUID:
+            case BOOLEAN:
+                break;
+            case NUMBER:
+                assert type instanceof NumberNativeType : 
type.getClass().getCanonicalName();
+
+                builder.precision(((NumberNativeType) type).precision());
+                break;
+            case DECIMAL:
+                assert type instanceof DecimalNativeType : 
type.getClass().getCanonicalName();
+
+                builder.precision(((DecimalNativeType) type).precision());
+                builder.scale(((DecimalNativeType) type).scale());
+                break;
+            case STRING:
+            case BYTES:
+                assert type instanceof VarlenNativeType : 
type.getClass().getCanonicalName();
+
+                builder.length(((VarlenNativeType) type).length());
+                break;
+            case BITMASK:
+                assert type instanceof BitmaskNativeType : 
type.getClass().getCanonicalName();
+
+                builder.length(((BitmaskNativeType) type).bits());
+                break;
+            case TIME:
+            case DATETIME:
+            case TIMESTAMP:
+                assert type instanceof TemporalNativeType : 
type.getClass().getCanonicalName();
+
+                builder.precision(((TemporalNativeType) type).precision());
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported native type: " 
+ typeSpec);
+        }
+
+        return builder.build();
+    }
+
+    private static Object[] project(Object[] row, @Nullable BitSet 
requiredElements) {
+        if (requiredElements == null) {
+            return row;
+        }
+
+        Object[] newRow = new Object[requiredElements.cardinality()];
+
+        int idx = 0;
+        for (int i = requiredElements.nextSetBit(0); i != -1; i = 
requiredElements.nextSetBit(i + 1)) {
+            newRow[idx++] = row[i];
+        }
+
+        return newRow;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
index 9a660217be..33f2e20041 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.sql.engine.framework;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
 import org.apache.ignite.internal.util.IgniteUtils;
 
@@ -33,9 +35,17 @@ import org.apache.ignite.internal.util.IgniteUtils;
  */
 public class TestCluster implements LifecycleAware {
     private final Map<String, TestNode> nodeByName;
+    private final List<LifecycleAware> components;
+    private final Runnable initClosure;
 
-    TestCluster(Map<String, TestNode> nodeByName) {
+    TestCluster(
+            Map<String, TestNode> nodeByName,
+            List<LifecycleAware> components,
+            Runnable initClosure
+    ) {
         this.nodeByName = nodeByName;
+        this.components = components;
+        this.initClosure = initClosure;
     }
 
     /**
@@ -51,16 +61,25 @@ public class TestCluster implements LifecycleAware {
     /** {@inheritDoc} */
     @Override
     public void start() {
+        components.forEach(LifecycleAware::start);
+
         nodeByName.values().forEach(TestNode::start);
+
+
+        initClosure.run();
     }
 
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
-        List<AutoCloseable> closeables = nodeByName.values().stream()
+        List<AutoCloseable> closeables = Stream.concat(
+                        components.stream(),
+                        nodeByName.values().stream()
+                )
                 .map(node -> ((AutoCloseable) node::stop))
                 .collect(Collectors.toList());
 
+        Collections.reverse(closeables);
         IgniteUtils.closeAll(closeables);
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
index 0ca5440115..2d6d7c94c6 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
@@ -18,19 +18,30 @@
 package org.apache.ignite.internal.sql.engine.framework;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.BitSet;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
 import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.apache.ignite.internal.util.subscription.TransformingPublisher;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
@@ -39,24 +50,65 @@ import org.junit.jupiter.api.Test;
  */
 public class TestClusterTest extends BaseIgniteAbstractTest {
 
-    private final DataProvider<Object[]> dataProvider = DataProvider.fromRow(
-            new Object[]{42, UUID.randomUUID().toString()}, 3_333
-    );
+    private final ScannableTable table = new ScannableTable() {
+        @Override
+        public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, 
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+                @Nullable BitSet requiredColumns) {
+
+            return new TransformingPublisher<>(
+                    SubscriptionUtils.fromIterable(
+                            DataProvider.fromRow(
+                                    new Object[]{42, 
UUID.randomUUID().toString()}, 3_333
+                            )
+                    ), rowFactory::create
+            );
+        }
+
+        @Override
+        public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT> 
ctx, PartitionWithTerm partWithTerm,
+                RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, @Nullable RangeCondition<RowT> cond,
+                @Nullable BitSet requiredColumns) {
+
+            return new TransformingPublisher<>(
+                    SubscriptionUtils.fromIterable(
+                            DataProvider.fromRow(
+                                    new Object[]{42, 
UUID.randomUUID().toString()}, 10
+                            )
+                    ), rowFactory::create
+            );
+        }
+
+        @Override
+        public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx, 
PartitionWithTerm partWithTerm,
+                RowFactory<RowT> rowFactory, int indexId, List<String> 
columns, RowT key, @Nullable BitSet requiredColumns) {
+
+            return new TransformingPublisher<>(
+                    SubscriptionUtils.fromIterable(
+                            DataProvider.fromRow(
+                                    new Object[]{42, 
UUID.randomUUID().toString()}, 1
+                            )
+                    ), rowFactory::create
+            );
+        }
+    };
 
     // @formatter:off
-    private final TestCluster cluster = TestBuilders.cluster().nodes("N1")
+    private final TestCluster cluster = TestBuilders.cluster()
+            .nodes("N1", "N2")
             .addTable()
-            .name("T1")
-            .distribution(IgniteDistributions.hash(List.of(0)))
-            .addColumn("ID", NativeTypes.INT32)
-            .addColumn("VAL", NativeTypes.stringOf(64))
-            .defaultDataProvider(dataProvider)
-            .addHashIndex()
-            .name("IDX_ID")
-            .addColumn("ID")
-            .defaultDataProvider(dataProvider)
-            .end()
-            .end()
+                .name("T1")
+                .addKeyColumn("ID", NativeTypes.INT32)
+                .addColumn("VAL", NativeTypes.stringOf(64))
+                .addSortedIndex()
+                    .name("SORTED_IDX")
+                    .addColumn("ID", Collation.ASC_NULLS_FIRST)
+                    .end()
+                .end()
+            .dataProvider("N1", "T1", table)
+            .dataProvider("N2", "T1", table)
+            // table T2 will be created later by DDL
+            .dataProvider("N1", "T2", table)
+            .dataProvider("N2", "T2", table)
             .build();
     // @formatter:on
 
@@ -85,11 +137,30 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
         assertTrue(fragment.root().getInput(0) instanceof IgniteTableScan);
     }
 
-    /**
-     * Runs a SELECT query with condition.
-     */
     @Test
-    public void testQueryWithCondition() {
+    public void testSimpleFromCreatedTableByDdl() {
+        cluster.start();
+
+        var gatewayNode = cluster.node("N1");
+
+        gatewayNode.initSchema(
+                "CREATE TABLE t2 (id INT PRIMARY KEY, val VARCHAR(64))"
+        );
+
+        QueryPlan plan = gatewayNode.prepare("SELECT * FROM t2");
+
+        for (var row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+            assertNotNull(row);
+        }
+
+        // Ensure the plan contains full table scan.
+        assertTrue(plan instanceof MultiStepPlan);
+        Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
+        assertTrue(fragment.root().getInput(0) instanceof IgniteTableScan);
+    }
+
+    @Test
+    public void testSelectByKey() {
         cluster.start();
 
         TestNode gatewayNode = cluster.node("N1");
@@ -103,5 +174,24 @@ public class TestClusterTest extends 
BaseIgniteAbstractTest {
         assertTrue(plan instanceof MultiStepPlan);
         Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
         assertTrue(fragment.root().getInput(0) instanceof IgniteIndexScan);
+        assertEquals("T1_PK", ((IgniteIndexScan) 
fragment.root().getInput(0)).indexName());
+    }
+
+    @Test
+    public void testSelectRange() {
+        cluster.start();
+
+        TestNode gatewayNode = cluster.node("N1");
+        QueryPlan plan = gatewayNode.prepare("SELECT * FROM t1 WHERE ID > 1");
+
+        for (List<?> row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+            assertNotNull(row);
+        }
+
+        // Ensure the plan uses index.
+        assertTrue(plan instanceof MultiStepPlan);
+        Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
+        assertTrue(fragment.root().getInput(0) instanceof IgniteIndexScan);
+        assertEquals("SORTED_IDX", ((IgniteIndexScan) 
fragment.root().getInput(0)).indexName());
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
index 4f3e891ceb..188f612a9c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
@@ -17,10 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.framework;
 
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
@@ -29,23 +26,18 @@ import 
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 
 /**
- * A test index that implements all the necessary for the optimizer methods to 
be used to prepare a query, as well as provides access to the
- * data to use this index in execution-related scenarios.
+ * A test index that implements all the necessary for the optimizer methods to 
be used to prepare a query.
  */
 public class TestIndex extends IgniteIndex {
-    private static final String DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE =
-            "DataProvider is not configured [index={}, node={}]";
-
     /** Factory method for creating hash-index. */
     static TestIndex createHash(
             String name,
             List<String> columns,
-            TableDescriptor tableDescriptor,
-            Map<String, DataProvider<?>> dataProviders
+            TableDescriptor tableDescriptor
     ) {
         RelCollation collation = TraitUtils.createCollation(columns, null, 
tableDescriptor);
 
-        return new TestIndex(name, Type.HASH, tableDescriptor.distribution(), 
collation, dataProviders);
+        return new TestIndex(name, Type.HASH, tableDescriptor.distribution(), 
collation);
     }
 
     /** Factory method for creating sorted-index. */
@@ -53,44 +45,22 @@ public class TestIndex extends IgniteIndex {
             String name,
             List<String> columns,
             List<Collation> collations,
-            TableDescriptor tableDescriptor,
-            Map<String, DataProvider<?>> dataProviders
+            TableDescriptor tableDescriptor
     ) {
         RelCollation collation = TraitUtils.createCollation(columns, 
collations, tableDescriptor);
 
-        return new TestIndex(name, Type.SORTED, 
tableDescriptor.distribution(), collation, dataProviders);
+        return new TestIndex(name, Type.SORTED, 
tableDescriptor.distribution(), collation);
     }
 
     private static final AtomicInteger ID = new AtomicInteger();
 
-    private final Map<String, DataProvider<?>> dataProviders;
-
     /** Constructor. */
     TestIndex(
             String name,
             Type type,
             IgniteDistribution distribution,
-            RelCollation collation,
-            Map<String, DataProvider<?>> dataProviders
+            RelCollation collation
     ) {
         super(ID.incrementAndGet(), name, type, distribution, collation);
-
-        this.dataProviders = dataProviders;
-    }
-
-    /**
-     * Returns the data provider for the given node.
-     *
-     * @param nodeName Name of the node of interest.
-     * @param <RowT> A type of the rows the data provider should produce.
-     * @return A data provider for the node of interest.
-     * @throws AssertionError in case data provider is not configured for the 
given node.
-     */
-    <RowT> DataProvider<RowT> dataProvider(String nodeName) {
-        if (!dataProviders.containsKey(nodeName)) {
-            throw new 
AssertionError(format(DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE, name(), 
nodeName));
-        }
-
-        return (DataProvider<RowT>) dataProviders.get(nodeName);
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 29359eaebc..fd7d59f7f4 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -18,56 +18,44 @@
 package org.apache.ignite.internal.sql.engine.framework;
 
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static 
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.PLANNING_TIMEOUT;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolver;
 import 
org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolverImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
-import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.NoOpExecutableTableRegistry;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
-import org.apache.ignite.internal.sql.engine.exec.rel.Node;
-import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
-import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
-import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
-import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.StringUtils;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
@@ -96,13 +84,16 @@ public class TestNode implements LifecycleAware {
     TestNode(
             String nodeName,
             ClusterService clusterService,
+            ParserService parserService,
+            PrepareService prepareService,
             SqlSchemaManager schemaManager,
-            MappingService mappingService
+            MappingService mappingService,
+            ExecutableTableRegistry tableRegistry,
+            DdlCommandHandler ddlCommandHandler
     ) {
         this.nodeName = nodeName;
-        var ps = new PrepareServiceImpl(nodeName, 0, 
CaffeineCacheFactory.INSTANCE,
-                mock(DdlSqlToCommandConverter.class), PLANNING_TIMEOUT, 
mock(MetricManager.class));
-        this.prepareService = registerService(ps);
+        this.parserService = parserService;
+        this.prepareService = prepareService;
         this.schemaManager = schemaManager;
 
         TopologyService topologyService = clusterService.topologyService();
@@ -113,52 +104,27 @@ public class TestNode implements LifecycleAware {
         QueryTaskExecutor taskExecutor = registerService(new 
QueryTaskExecutorImpl(nodeName));
 
         MessageService messageService = registerService(new MessageServiceImpl(
-                topologyService.localMember().name(), messagingService, 
taskExecutor, new IgniteSpinBusyLock()
+                nodeName, messagingService, taskExecutor, new 
IgniteSpinBusyLock()
         ));
         ExchangeService exchangeService = registerService(new 
ExchangeServiceImpl(
                 mailboxRegistry, messageService
         ));
-        NoOpExecutableTableRegistry executableTableRegistry = new 
NoOpExecutableTableRegistry();
         ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(
-                executableTableRegistry, null
+                tableRegistry, null
         );
 
-        executionService = registerService(new ExecutionServiceImpl<>(
-                messageService,
+        executionService = registerService(ExecutionServiceImpl.create(
                 topologyService,
-                mappingService,
+                messageService,
                 schemaManager,
-                mock(DdlCommandHandler.class),
+                ddlCommandHandler,
                 taskExecutor,
                 rowHandler,
-                dependencyResolver,
-                (ctx, deps) -> new LogicalRelImplementor<Object[]>(
-                        ctx,
-                        new HashFunctionFactoryImpl<>(rowHandler),
-                        mailboxRegistry,
-                        exchangeService,
-                        deps
-                ) {
-                    @Override
-                    public Node<Object[]> visit(IgniteTableScan rel) {
-                        DataProvider<Object[]> dataProvider = 
rel.getTable().unwrap(TestTable.class).dataProvider(ctx.localNode().name());
-
-                        return new ScanNode<>(ctx, dataProvider);
-                    }
-
-                    @Override
-                    public Node<Object[]> visit(IgniteIndexScan rel) {
-                        TestTable tbl = rel.getTable().unwrap(TestTable.class);
-                        TestIndex idx = (TestIndex) 
tbl.indexes().get(rel.indexName());
-
-                        DataProvider<Object[]> dataProvider = 
idx.dataProvider(ctx.localNode().name());
-
-                        return new ScanNode<>(ctx, dataProvider);
-                    }
-                }
+                mailboxRegistry,
+                exchangeService,
+                mappingService,
+                dependencyResolver
         ));
-
-        parserService = new ParserServiceImpl(0, EmptyCacheFactory.INSTANCE);
     }
 
     /** {@inheritDoc} */
@@ -221,6 +187,36 @@ public class TestNode implements LifecycleAware {
         return await(prepareService.prepareAsync(parsedResult, 
createContext()));
     }
 
+    /**
+     * Executes the given script.
+     *
+     * <p>This method splits given string by semicolon and execute every 
statement
+     * one by one. Technically it may execute SELECT statements as well, but 
since
+     * it returns nothing, it doesn't make any sense.
+     *
+     * @param script Script to execute.
+     */
+    public void initSchema(String script) {
+        for (String statement : script.split(";")) {
+            if (StringUtils.nullOrBlank(statement) || 
statement.trim().startsWith("--")) {
+                continue;
+            }
+
+            ParsedResult parsedResult = parserService.parse(statement);
+            BaseQueryContext ctx = createContext();
+
+            QueryPlan plan = await(prepareService.prepareAsync(parsedResult, 
ctx));
+
+            if (plan.type() != SqlQueryType.DDL && plan.type() != 
SqlQueryType.DML) {
+                continue;
+            }
+
+            AsyncCursor<?> cursor = executionService.executePlan(new 
NoOpTransaction("tx"), plan, ctx);
+
+            await(cursor.requestNextAsync(1));
+        }
+    }
+
     private BaseQueryContext createContext() {
         return BaseQueryContext.builder()
                 .cancel(new QueryCancel())
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index eee7c3288c..651e74d55a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -38,7 +38,6 @@ import 
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.session.SessionInfo;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.AsyncCursor;
@@ -71,11 +70,10 @@ public class QueryCheckerTest extends 
BaseIgniteAbstractTest {
                     .name("T1")
                     .addKeyColumn("ID", NativeTypes.INT32)
                     .addColumn("VAL", NativeTypes.INT32)
-                    .distribution(IgniteDistributions.hash(List.of(0)))
-                    .defaultDataProvider(DataProvider.fromCollection(List.of(
-                            new Object[] {1, 1}, new Object[] {2, 2}
-                    )))
                     .end()
+            .dataProvider(NODE_NAME, "T1", 
TestBuilders.tableScan(DataProvider.fromCollection(List.of(
+                    new Object[] {1, 1}, new Object[] {2, 2}
+            ))))
             .build();
     // @formatter:on
 
diff --git 
a/modules/sql-engine/src/test/resources/tpch/schema_definition_ddl.sql 
b/modules/sql-engine/src/test/resources/tpch/schema_definition_ddl.sql
new file mode 100644
index 0000000000..18f080cf3d
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/tpch/schema_definition_ddl.sql
@@ -0,0 +1,125 @@
+-- noinspection SqlDialectInspectionForFile
+-- noinspection SqlNoDataSourceInspectionForFile
+
+DROP TABLE IF EXISTS nation;
+DROP TABLE IF EXISTS region;
+DROP TABLE IF EXISTS part;
+DROP TABLE IF EXISTS supplier;
+DROP TABLE IF EXISTS partsupp;
+DROP TABLE IF EXISTS orders;
+DROP TABLE IF EXISTS customer;
+DROP TABLE IF EXISTS lineitem;
+
+CREATE TABLE region (
+    r_regionkey integer  NOT NULL,
+    r_name      char(25) NOT NULL,
+    r_comment   varchar(152),
+    PRIMARY KEY (r_regionkey)
+);
+
+CREATE TABLE nation (
+    n_nationkey integer  NOT NULL,
+    n_name      char(25) NOT NULL,
+    n_regionkey integer  NOT NULL,
+    n_comment   varchar(152),
+    PRIMARY KEY (n_nationkey)
+);
+
+CREATE INDEX n_rk ON nation (n_regionkey ASC);
+
+CREATE TABLE part (
+    p_partkey     integer        NOT NULL,
+    p_name        varchar(55)    NOT NULL,
+    p_mfgr        char(25)       NOT NULL,
+    p_brand       char(10)       NOT NULL,
+    p_type        varchar(25)    NOT NULL,
+    p_size        integer        NOT NULL,
+    p_container   char(10)       NOT NULL,
+    p_retailprice decimal(15, 2) NOT NULL,
+    p_comment     varchar(23)    NOT NULL,
+    PRIMARY KEY (p_partkey)
+);
+
+
+CREATE TABLE supplier (
+    s_suppkey   integer        NOT NULL,
+    s_name      char(25)       NOT NULL,
+    s_address   varchar(40)    NOT NULL,
+    s_nationkey integer        NOT NULL,
+    s_phone     char(15)       NOT NULL,
+    s_acctbal   decimal(15, 2) NOT NULL,
+    s_comment   varchar(101)   NOT NULL,
+    PRIMARY KEY (s_suppkey)
+);
+
+CREATE INDEX s_nk ON supplier (s_nationkey ASC);
+
+CREATE TABLE partsupp (
+    ps_partkey    integer        NOT NULL,
+    ps_suppkey    integer        NOT NULL,
+    ps_availqty   integer        NOT NULL,
+    ps_supplycost decimal(15, 2) NOT NULL,
+    ps_comment    varchar(199)   NOT NULL,
+    PRIMARY KEY (ps_partkey, ps_suppkey)
+);
+
+CREATE INDEX ps_sk ON partsupp (ps_suppkey ASC);
+CREATE INDEX ps_sk_pk ON partsupp (ps_suppkey ASC, ps_partkey ASC);
+
+CREATE TABLE customer (
+    c_custkey    integer        NOT NULL,
+    c_name       varchar(25)    NOT NULL,
+    c_address    varchar(40)    NOT NULL,
+    c_nationkey  integer        NOT NULL,
+    c_phone      char(15)       NOT NULL,
+    c_acctbal    decimal(15, 2) NOT NULL,
+    c_mktsegment char(10)       NOT NULL,
+    c_comment    varchar(117)   NOT NULL,
+    PRIMARY KEY (c_custkey)
+);
+
+CREATE INDEX c_nk ON customer (c_nationkey ASC);
+
+CREATE TABLE orders (
+    o_orderkey      integer        NOT NULL,
+    o_custkey       integer        NOT NULL,
+    o_orderstatus   char(1)        NOT NULL,
+    o_totalprice    decimal(15, 2) NOT NULL,
+    o_orderdate     date           NOT NULL,
+    o_orderpriority char(15)       NOT NULL,
+    o_clerk         char(15)       NOT NULL,
+    o_shippriority  integer        NOT NULL,
+    o_comment       varchar(79)    NOT NULL,
+    PRIMARY KEY (o_orderkey)
+);
+
+CREATE INDEX o_ck ON orders (o_custkey ASC);
+CREATE INDEX o_od ON orders (o_orderdate ASC);
+
+CREATE TABLE lineitem (
+    l_orderkey      integer        NOT NULL,
+    l_partkey       integer        NOT NULL,
+    l_suppkey       integer        NOT NULL,
+    l_linenumber    integer        NOT NULL,
+    l_quantity      decimal(15, 2) NOT NULL,
+    l_extendedprice decimal(15, 2) NOT NULL,
+    l_discount      decimal(15, 2) NOT NULL,
+    l_tax           decimal(15, 2) NOT NULL,
+    l_returnflag    char(1)        NOT NULL,
+    l_linestatus    char(1)        NOT NULL,
+    l_shipdate      date           NOT NULL,
+    l_commitdate    date           NOT NULL,
+    l_receiptdate   date           NOT NULL,
+    l_shipinstruct  char(25)       NOT NULL,
+    l_shipmode      char(10)       NOT NULL,
+    l_comment       varchar(44)    NOT NULL,
+    PRIMARY KEY (l_orderkey, l_linenumber)
+);
+
+CREATE INDEX l_pk ON lineitem (l_partkey ASC);
+CREATE INDEX l_sk ON lineitem (l_suppkey ASC);
+CREATE INDEX l_sd ON lineitem (l_shipdate ASC);
+CREATE INDEX l_cd ON lineitem (l_commitdate ASC);
+CREATE INDEX l_rd ON lineitem (l_receiptdate ASC);
+CREATE INDEX l_pk_sk ON lineitem (l_partkey ASC, l_suppkey ASC);
+CREATE INDEX l_sk_pk ON lineitem (l_suppkey ASC, l_partkey ASC);


Reply via email to