This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch ignite-17275
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 13102dcb379f8cb08f8ce562411d8bf39d677cc0
Author: korlov42 <[email protected]>
AuthorDate: Fri Dec 23 11:49:09 2022 +0200

    IGNITE-17275 Sql. Performance testing of pure SQL execution
---
 modules/sql-engine/pom.xml                         |  25 ++
 .../internal/sql/engine/SqlQueryProcessor.java     |  10 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |  28 +-
 .../sql/engine/schema/ColumnDescriptorImpl.java    |   3 +-
 .../internal/sql/engine/schema/IgniteSchema.java   |  13 +-
 .../sql/engine/benchmarks/SqlBenchmark.java        | 152 ++++++++++
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  40 +--
 .../sql/engine/framework/DataProvider.java         |  43 +++
 .../engine/framework/PredefinedSchemaManager.java  |  80 ++++++
 .../sql/engine/framework/TestBuilders.java         | 291 ++++++++++++++++++++
 .../internal/sql/engine/framework/TestCluster.java |  66 +++++
 .../sql/engine/framework/TestClusterService.java   | 199 ++++++++++++++
 .../internal/sql/engine/framework/TestNode.java    | 199 ++++++++++++++
 .../internal/sql/engine/framework/TestTable.java   | 305 +++++++++++++++++++++
 .../sql/engine/planner/AbstractPlannerTest.java    |  33 +--
 15 files changed, 1414 insertions(+), 73 deletions(-)

diff --git a/modules/sql-engine/pom.xml b/modules/sql-engine/pom.xml
index 6ad7ca90dc..4066df87cf 100644
--- a/modules/sql-engine/pom.xml
+++ b/modules/sql-engine/pom.xml
@@ -208,6 +208,19 @@
             <artifactId>slf4j-jdk14</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <!-- Benchmark dependencies -->
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -313,6 +326,12 @@
                         <artifactId>value</artifactId>
                         <version>${immutables.version}</version>
                     </dependency>
+
+                    <dependency>
+                        <groupId>org.openjdk.jmh</groupId>
+                        <artifactId>jmh-generator-annprocess</artifactId>
+                        <version>${jmh.framework.version}</version>
+                    </dependency>
                 </dependencies>
                 <configuration>
 <!--                    <compilerArgs>
@@ -330,6 +349,12 @@
                             <artifactId>value</artifactId>
                             <version>${immutables.version}</version>
                         </path>
+
+                        <path>
+                            <groupId>org.openjdk.jmh</groupId>
+                            <artifactId>jmh-generator-annprocess</artifactId>
+                            <version>${jmh.framework.version}</version>
+                        </path>
                     </annotationProcessorPaths>
                 </configuration>
             </plugin>
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 0ce7773d9e..eecaf29519 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
@@ -210,18 +211,17 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         this.prepareSvc = prepareSvc;
 
+        var ddlCommandHandler = new DdlCommandHandler(distributionZoneManager, 
tableManager, indexManager, dataStorageManager);
+
         var executionSrvc = registerService(ExecutionServiceImpl.create(
                 clusterSrvc.topologyService(),
                 msgSrvc,
                 sqlSchemaManager,
-                distributionZoneManager,
-                tableManager,
-                indexManager,
+                ddlCommandHandler,
                 taskExecutor,
                 ArrayRowHandler.INSTANCE,
                 mailboxRegistry,
-                exchangeService,
-                dataStorageManager
+                exchangeService
         ));
 
         clusterSrvc.topologyService().addEventHandler(executionSrvc);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 37a243efce..9b4d7027eb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -39,9 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.configuration.ConfigurationChangeException;
-import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.AsyncCursor;
@@ -72,8 +70,6 @@ import 
org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
-import org.apache.ignite.internal.storage.DataStorageManager;
-import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -117,13 +113,11 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
      * @param topSrvc Topology service.
      * @param msgSrvc Message service.
      * @param sqlSchemaManager Schema manager.
-     * @param indexManager Index manager.
-     * @param tblManager Table manager.
+     * @param ddlCommandHandler Handler of the DDL commands.
      * @param taskExecutor Task executor.
      * @param handler Row handler.
      * @param mailboxRegistry Mailbox registry.
      * @param exchangeSrvc Exchange service.
-     * @param dataStorageManager Storage manager.
      * @param <RowT> Type of the sql row.
      * @return An execution service.
      */
@@ -131,21 +125,18 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             TopologyService topSrvc,
             MessageService msgSrvc,
             SqlSchemaManager sqlSchemaManager,
-            DistributionZoneManager distributionZoneManager,
-            TableManager tblManager,
-            IndexManager indexManager,
+            DdlCommandHandler ddlCommandHandler,
             QueryTaskExecutor taskExecutor,
             RowHandler<RowT> handler,
             MailboxRegistry mailboxRegistry,
-            ExchangeService exchangeSrvc,
-            DataStorageManager dataStorageManager
+            ExchangeService exchangeSrvc
     ) {
         return new ExecutionServiceImpl<>(
                 topSrvc.localMember(),
                 msgSrvc,
                 new MappingServiceImpl(topSrvc),
                 sqlSchemaManager,
-                new DdlCommandHandler(distributionZoneManager, tblManager, 
indexManager, dataStorageManager),
+                ddlCommandHandler,
                 taskExecutor,
                 handler,
                 exchangeSrvc,
@@ -161,7 +152,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     /**
      * Constructor. TODO Documentation 
https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    ExecutionServiceImpl(
+    public ExecutionServiceImpl(
             ClusterNode localNode,
             MessageService msgSrvc,
             MappingService mappingSrvc,
@@ -718,8 +709,15 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         }
     }
 
+    /**
+     * A factory of the relational node implementors.
+     *
+     * @param <RowT> A type of the row the execution tree will be working with.
+     * @see LogicalRelImplementor
+     */
     @FunctionalInterface
-    interface ImplementorFactory<RowT> {
+    public interface ImplementorFactory<RowT> {
+        /** Creates the relational node implementor with the given context. */
         LogicalRelImplementor<RowT> create(ExecutionContext<RowT> ctx);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
index 76f56d23a3..2dcc455ecb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.schema;
 import java.util.Objects;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.schema.NativeType;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Simple implementation of {@link ColumnDescriptor}.
@@ -65,7 +66,7 @@ public class ColumnDescriptorImpl implements ColumnDescriptor 
{
             int physicalIndex,
             NativeType type,
             DefaultValueStrategy defaultStrategy,
-            Supplier<Object> dfltVal
+            @Nullable Supplier<Object> dfltVal
     ) {
         this.key = key;
         this.nullable = nullable;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 52aca0781d..cd64f671a6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite schema.
@@ -38,11 +39,17 @@ public class IgniteSchema extends AbstractSchema {
      * Creates a Schema.
      *
      * @param schemaName Schema name.
+     * @param tableMap Schema name.
+     * @param indexMap Schema name.
      */
-    public IgniteSchema(String schemaName, Map<String, Table> tblMap, 
Map<UUID, IgniteIndex> idxMap) {
+    public IgniteSchema(
+            String schemaName,
+            @Nullable Map<String, Table> tableMap,
+            @Nullable Map<UUID, IgniteIndex> indexMap
+    ) {
         this.schemaName = schemaName;
-        this.tblMap = tblMap == null ? new ConcurrentHashMap<>() : new 
ConcurrentHashMap<>(tblMap);
-        this.idxMap = idxMap == null ? new ConcurrentHashMap<>() : new 
ConcurrentHashMap<>(idxMap);
+        this.tblMap = tableMap == null ? new ConcurrentHashMap<>() : new 
ConcurrentHashMap<>(tableMap);
+        this.idxMap = indexMap == null ? new ConcurrentHashMap<>() : new 
ConcurrentHashMap<>(indexMap);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
new file mode 100644
index 0000000000..7aca579f3e
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.benchmarks;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.sql.engine.framework.DataProvider;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * A micro-benchmark of sql execution.
+ */
+@Warmup(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Benchmark)
+public class SqlBenchmark {
+    private final DataProvider<Object[]> dataProvider = new 
SameRowDataProvider(
+            new Object[] {42, UUID.randomUUID().toString()}, 333
+    );
+
+    // @formatter:off
+    private final TestCluster cluster = TestBuilders.cluster()
+            .nodes("N1", "N2", "N3")
+            .addTable()
+                    .name("T1")
+                    .distribution(IgniteDistributions.hash(List.of(0)))
+                    .addColumn("ID", NativeTypes.INT32)
+                    .addColumn("VAL", NativeTypes.stringOf(64))
+                    .addDataProvider("N1", dataProvider)
+                    .addDataProvider("N2", dataProvider)
+                    .addDataProvider("N3", dataProvider)
+                    .end()
+            .build();
+    // @formatter:on
+
+    private final TestNode gatewayNode = cluster.node("N1");
+
+    private QueryPlan plan;
+
+    /** Starts the cluster and prepares the plan of the query. */
+    @Setup
+    public void setUp() {
+        cluster.start();
+
+        plan = gatewayNode.prepare("SELECT * FROM t1");
+    }
+
+    /** Stops the cluster. */
+    @TearDown
+    public void tearDown() throws Exception {
+        cluster.stop();
+    }
+
+    /** Very simple test to measure performance of minimal possible 
distributed query. */
+    @Benchmark
+    public void selectAllSimple(Blackhole bh) {
+        for (var row : 
await(gatewayNode.executePlan(plan).requestNextAsync(1_000)).items()) {
+            bh.consume(row);
+        }
+    }
+
+    /**
+     * Runs the benchmark.
+     *
+     * @param args args
+     * @throws Exception if something goes wrong
+     */
+    public static void main(String[] args) throws Exception {
+        Options build = new OptionsBuilder()
+                //.addProfiler("gc")
+                .include(SqlBenchmark.class.getName())
+                .build();
+
+        new Runner(build).run();
+    }
+
+    private static class SameRowDataProvider implements DataProvider<Object[]> 
{
+        private final int times;
+        private final Object[] row;
+
+        SameRowDataProvider(Object[] row, int times) {
+            this.times = times;
+            this.row = row;
+        }
+
+        @Override
+        public Iterator<Object[]> iterator() {
+            return new Iterator<>() {
+                private int counter;
+
+                @Override
+                public boolean hasNext() {
+                    return counter < times;
+                }
+
+                @Override
+                public Object[] next() {
+                    if (!hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    counter++;
+
+                    return row;
+                }
+            };
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 92e963db86..49771678a9 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import static 
org.apache.ignite.internal.sql.engine.util.BaseQueryContext.CLUSTER;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -35,6 +34,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -45,18 +45,20 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import 
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.rel.Node;
 import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.framework.TestTable;
 import 
org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
 import org.apache.ignite.internal.sql.engine.message.MessageListener;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
@@ -64,14 +66,16 @@ import 
org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
-import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTable;
-import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -108,7 +112,7 @@ public class ExecutionServiceImplTest {
     );
 
     private final TestTable table = createTable("TEST_TBL", 1_000_000, 
IgniteDistributions.random(),
-            "ID", Integer.class, "VAL", Integer.class);
+            "ID", NativeTypes.INT32, "VAL", NativeTypes.INT32);
 
     private final IgniteSchema schema = new IgniteSchema("PUBLIC", 
Map.of(table.name(), table), null);
 
@@ -588,22 +592,22 @@ public class ExecutionServiceImplTest {
             throw new IllegalArgumentException("'fields' should be non-null 
array with even number of elements");
         }
 
-        RelDataTypeFactory.Builder b = new 
RelDataTypeFactory.Builder(CLUSTER.getTypeFactory());
+        List<ColumnDescriptor> columns = new ArrayList<>();
 
         for (int i = 0; i < fields.length; i += 2) {
-            b.add((String) fields[i], 
CLUSTER.getTypeFactory().createJavaType((Class<?>) fields[i + 1]));
+            columns.add(
+                    new ColumnDescriptorImpl(
+                            (String) fields[i], false, true, i, i,
+                            (NativeType) fields[i + 1], 
DefaultValueStrategy.DEFAULT_NULL, null
+                    )
+            );
         }
 
-        return new TestTable(name, b.build(), size) {
-            @Override
-            public IgniteDistribution distribution() {
-                return distr;
-            }
-
-            @Override
-            public ColocationGroup colocationGroup(MappingQueryContext ctx) {
-                return ColocationGroup.forNodes(nodeNames);
-            }
-        };
+        return new TestTable(
+                new TableDescriptorImpl(columns, distr),
+                name,
+                ColocationGroup.forNodes(nodeNames),
+                size
+        );
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
new file mode 100644
index 0000000000..a143f4c6ae
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+
+/**
+ * Producer of the rows to use with {@link TestTable} in execution-related 
scenarios.
+ *
+ * <p>A data provider is supposed to be created for table on per-node basis. 
It's up
+ * to developer to keep data provider in sync with the schema of the table 
this data provider relates to.
+ *
+ * @param <T> A type of the produced elements.
+ * @see TestTable
+ */
+@FunctionalInterface
+public interface DataProvider<T> extends Iterable<T> {
+    /**
+     * Creates data provider from given collection.
+     *
+     * @param collection Collection to use as source of data.
+     * @param <T> A type of the produced elements.
+     * @return A data provider instance backed by given collection.
+     */
+    static <T> DataProvider<T> fromCollection(Collection<T> collection) {
+        return collection::iterator;
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
new file mode 100644
index 0000000000..bd9e9bc4ce
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy wrapper for predefined collection of schemas.
+ *
+ * <p>Accepts collection of {@link IgniteSchema schemas} as parameter and 
implements required
+ * methods of {@link SqlSchemaManager} around them. Assumes given schemas will 
never be changed.
+ *
+ * @see IgniteSchema
+ * @see SqlSchemaManager
+ */
+public class PredefinedSchemaManager implements SqlSchemaManager {
+    private final SchemaPlus root;
+    private final Map<UUID, IgniteTable> tableById;
+
+    /** Constructs schema manager from a single schema. */
+    public PredefinedSchemaManager(IgniteSchema schema) {
+        this(List.of(schema));
+    }
+
+    /** Constructs schema manager from a collection of schemas. */
+    public PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
+        this.root = Frameworks.createRootSchema(false);
+        this.tableById = new HashMap<>();
+
+        for (IgniteSchema schema : schemas) {
+            root.add(schema.getName(), schema);
+
+            tableById.putAll(
+                    schema.getTableNames().stream()
+                            .map(schema::getTable)
+                            .map(IgniteTable.class::cast)
+                            .collect(Collectors.toMap(IgniteTable::id, 
Function.identity()))
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SchemaPlus schema(@Nullable String schema) {
+        return schema == null ? root : root.getSubSchema(schema);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteTable tableById(UUID id, int ver) {
+        return tableById.get(id);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
new file mode 100644
index 0000000000..23aaa6bcb1
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.Table;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+
+/**
+ * A collection of builders to create test objects.
+ */
+public class TestBuilders {
+    /** Returns a builder of the test cluster object. */
+    public static ClusterBuilder cluster() {
+        return new ClusterBuilderImpl();
+    }
+
+    /** Returns a builder of the test table object. */
+    public static TableBuilder table() {
+        return new TableBuilderImpl();
+    }
+
+    /**
+     * A builder to created a test cluster object.
+     *
+     * @see TestCluster
+     */
+    public interface ClusterBuilder {
+        /**
+         * Sets desired names for the cluster nodes.
+         *
+         * @param nodeNames An array of node names to create cluster from.
+         * @return {@code this} for chaining.
+         */
+        ClusterBuilder nodes(String... nodeNames);
+
+        /**
+         * Creates a table builder to add to the cluster.
+         *
+         * @return An instance of table builder.
+         */
+        ClusterTableBuilder addTable();
+
+        /**
+         * Builds the cluster object.
+         *
+         * @return Created cluster object.
+         */
+        TestCluster build();
+    }
+
+    /**
+     * A builder to created a test table object.
+     *
+     * @see TestTable
+     */
+    public interface TableBuilder extends TableBuilderBase<TableBuilder> {
+        /**
+         * Builds a table.
+         *
+         * @return Created table object.
+         */
+        public TestTable build();
+    }
+
+    /**
+     * A builder to created a test table as nested object of the cluster.
+     *
+     * @see TestTable
+     * @see TestCluster
+     */
+    public interface ClusterTableBuilder extends 
TableBuilderBase<ClusterTableBuilder>, NestedBuilder<ClusterBuilder> {
+    }
+
+    private static class ClusterBuilderImpl implements ClusterBuilder {
+        private final List<TestTable> tables = new ArrayList<>();
+        private List<String> nodeNames;
+
+        @Override
+        public ClusterBuilder nodes(String... nodeNames) {
+            this.nodeNames = List.of(nodeNames);
+
+            return this;
+        }
+
+        @Override
+        public ClusterTableBuilder addTable() {
+            return new ClusterTableBuilderImpl(this);
+        }
+
+        @Override
+        public TestCluster build() {
+            var clusterService = new TestClusterService(nodeNames);
+
+            Map<String, Table> tableMap = tables.stream()
+                    .collect(Collectors.toMap(TestTable::name, 
Function.identity()));
+
+            var schemaManager = new PredefinedSchemaManager(new 
IgniteSchema("PUBLIC", tableMap, null));
+
+            Map<String, TestNode> nodes = nodeNames.stream()
+                    .map(name -> new TestNode(name, 
clusterService.spawnForNode(name), schemaManager))
+                    .collect(Collectors.toMap(TestNode::name, 
Function.identity()));
+
+            return new TestCluster(nodes);
+        }
+
+    }
+
+    private static class TableBuilderImpl extends 
AbstractTableBuilderImpl<TableBuilder> implements TableBuilder {
+        /** {@inheritDoc} */
+        @Override
+        public TestTable build() {
+            return new TestTable(
+                    new TableDescriptorImpl(columns, distribution), name, 
dataProviders, size
+            );
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected TableBuilder self() {
+            return this;
+        }
+    }
+
+    private static class ClusterTableBuilderImpl extends 
AbstractTableBuilderImpl<ClusterTableBuilder> implements ClusterTableBuilder {
+        private final ClusterBuilderImpl parent;
+
+        private ClusterTableBuilderImpl(ClusterBuilderImpl parent) {
+            this.parent = parent;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected ClusterTableBuilder self() {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ClusterBuilder end() {
+            parent.tables.add(new TestTable(
+                    new TableDescriptorImpl(columns, distribution), name, 
dataProviders, size
+            ));
+
+            return parent;
+        }
+    }
+
+    private abstract static class AbstractTableBuilderImpl<ChildT> implements 
TableBuilderBase<ChildT> {
+        protected final List<ColumnDescriptor> columns = new ArrayList<>();
+        protected final Map<String, DataProvider<?>> dataProviders = new 
HashMap<>();
+
+        protected String name;
+        protected IgniteDistribution distribution;
+        protected int size = 100_000;
+
+        protected abstract ChildT self();
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT name(String name) {
+            this.name = name;
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT distribution(IgniteDistribution distribution) {
+            this.distribution = distribution;
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT addColumn(String name, NativeType type) {
+            columns.add(new ColumnDescriptorImpl(
+                    name, false, true, columns.size(), columns.size(), type, 
DefaultValueStrategy.DEFAULT_NULL, null
+            ));
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT addDataProvider(String targetNode, DataProvider<?> 
dataProvider) {
+            this.dataProviders.put(targetNode, dataProvider);
+
+            return self();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ChildT size(int size) {
+            this.size = size;
+
+            return self();
+        }
+    }
+
+    /**
+     * Base interface describing the complete set of table-related fields.
+     *
+     * <p>The sole purpose of this interface is to keep in sync both variants 
of table's builders.
+     *
+     * @param <ChildT> An actual type of builder that should be exposed to the 
user.
+     * @see ClusterTableBuilder
+     * @see TableBuilder
+     */
+    private interface TableBuilderBase<ChildT> {
+        /** Sets the name of the table. */
+        ChildT name(String name);
+
+        /** Sets the distribution of the table. */
+        ChildT distribution(IgniteDistribution distribution);
+
+        /** Adds a column to the table. */
+        ChildT addColumn(String name, NativeType type);
+
+        /**
+         * Adds a data provider for the given node to the table.
+         * TODO
+         */
+        ChildT addDataProvider(String targetNode, DataProvider<?> 
dataProvider);
+
+        /** Sets the size of the table. */
+        ChildT size(int size);
+    }
+
+    /**
+     * This interfaces provides a nested builder with ability to return on the 
previous layer.
+     *
+     * <p>For example:</p>
+     * <pre>
+     *     interface ChildBuilder implements 
NestedBuilder&lt;ParentBuilder&gt; {
+     *         ChildBuilder nestedFoo();
+     *     }
+     *
+     *     interface ParentBuilder {
+     *         ParentBuilder foo();
+     *         ParentBuilder bar();
+     *         ChildBuilder child();
+     *     }
+     *
+     *     Builders.parent()
+     *         .foo()
+     *         .child() // now we are dealing with the ChildBuilder
+     *             .nestedFoo()
+     *             .end() // and here we are returning back to the 
ParentBuilder
+     *         .bar()
+     *         .build()
+     * </pre>
+     */
+    @FunctionalInterface
+    private interface NestedBuilder<ParentT> {
+        /**
+         * Notifies the builder's chain of the nested builder that we need to 
return back to the
+         * previous layer.
+         *
+         * @return An instance of the parent builder.
+         */
+        ParentT end();
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
new file mode 100644
index 0000000000..9a660217be
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * A test cluster object.
+ *
+ * <p>This is convenient holder of collection of nodes which provides methods 
for centralised
+ * accessing and management.
+ *
+ * <p>NB: do not forget to {@link #start()} cluster before use, and {@link 
#stop()} the cluster after.
+ */
+public class TestCluster implements LifecycleAware {
+    private final Map<String, TestNode> nodeByName;
+
+    TestCluster(Map<String, TestNode> nodeByName) {
+        this.nodeByName = nodeByName;
+    }
+
+    /**
+     * Returns the node for the given name, if exists.
+     *
+     * @param name A name of the node of interest.
+     * @return A test node or {@code null} if there is no node with such name.
+     */
+    public TestNode node(String name) {
+        return nodeByName.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        nodeByName.values().forEach(TestNode::start);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        List<AutoCloseable> closeables = nodeByName.values().stream()
+                .map(node -> ((AutoCloseable) node::stop))
+                .collect(Collectors.toList());
+
+        IgniteUtils.closeAll(closeables);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
new file mode 100644
index 0000000000..48b9981962
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.AbstractMessagingService;
+import org.apache.ignite.network.AbstractTopologyService;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.NodeMetadata;
+import org.apache.ignite.network.TopologyService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy cluster service object which helps to created connect
+ * instances of {@link TopologyService} and {@link MessagingService} for
+ * connecting group of {@link TestNode} within single cluster with each other.
+ */
+class TestClusterService {
+    private final List<String> allNodes;
+
+    private final Map<String, LocalMessagingService> messagingServicesByNode = 
new ConcurrentHashMap<>();
+    private final Map<String, LocalTopologyService> topologyServicesByNode = 
new ConcurrentHashMap<>();
+
+    /**
+     * Creates a cluster service object for given collection of nodes.
+     *
+     * @param allNodes A collection of nodes to create cluster service from.
+     */
+    TestClusterService(List<String> allNodes) {
+        this.allNodes = allNodes;
+    }
+
+    ClusterService spawnForNode(String nodeName) {
+        return new ClusterService() {
+            /** {@inheritDoc} */
+            @Override
+            public TopologyService topologyService() {
+                return topologyServicesByNode.computeIfAbsent(nodeName, name 
-> new LocalTopologyService(name, allNodes));
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public MessagingService messagingService() {
+                return messagingServicesByNode.computeIfAbsent(nodeName, 
LocalMessagingService::new);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public ClusterLocalConfiguration localConfiguration() {
+                throw new AssertionError("Should not be called");
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean isStopped() {
+                return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void updateMetadata(NodeMetadata metadata) {
+                throw new AssertionError("Should not be called");
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void start() {
+
+            }
+        };
+    }
+
+    private static class LocalTopologyService extends AbstractTopologyService {
+        private static final AtomicInteger NODE_COUNTER = new AtomicInteger(1);
+
+        private final ClusterNode localMember;
+        private final Map<String, ClusterNode> allMembers;
+        private final Map<NetworkAddress, ClusterNode> allMembersByAddress;
+
+        private LocalTopologyService(String localMember, List<String> 
allMembers) {
+            this.allMembers = allMembers.stream()
+                    .map(LocalTopologyService::nodeFromName)
+                    .collect(Collectors.toMap(ClusterNode::name, 
Function.identity()));
+
+            this.localMember = this.allMembers.get(localMember);
+
+            if (this.localMember == null) {
+                throw new IllegalArgumentException("Local member is not part 
of all members");
+            }
+
+            this.allMembersByAddress = new HashMap<>();
+
+            this.allMembers.forEach((ignored, member) -> 
allMembersByAddress.put(member.address(), member));
+        }
+
+        private static ClusterNode nodeFromName(String name) {
+            return new ClusterNode(name, name, 
NetworkAddress.from("127.0.0.1:" + NODE_COUNTER.incrementAndGet()));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ClusterNode localMember() {
+            return localMember;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Collection<ClusterNode> allMembers() {
+            return allMembers.values();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public @Nullable ClusterNode getByAddress(NetworkAddress addr) {
+            return allMembersByAddress.get(addr);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public @Nullable ClusterNode getByConsistentId(String consistentId) {
+            return allMembers.get(consistentId);
+        }
+    }
+
+    private class LocalMessagingService extends AbstractMessagingService {
+        private final String localNodeName;
+
+        private LocalMessagingService(String localNodeName) {
+            this.localNodeName = localNodeName;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> send(ClusterNode recipient, 
NetworkMessage msg) {
+            for (var handler : 
messagingServicesByNode.get(recipient.name()).messageHandlers(msg.groupType())) 
{
+                handler.onReceived(msg, localNodeName, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> respond(String recipientConsistentId, 
NetworkMessage msg, long correlationId) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+            throw new AssertionError("Not implemented yet");
+        }
+
+        private Collection<NetworkMessageHandler> messageHandlers(short 
groupType) {
+            return getMessageHandlers(groupType);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
new file mode 100644
index 0000000000..cba801716e
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.rel.Node;
+import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.message.MessageService;
+import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
+import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * An object representing a node in test cluster.
+ *
+ * <p>Provides convenient access to the methods for optimization and execution 
of the queries.
+ */
+public class TestNode implements LifecycleAware {
+    private final String nodeName;
+    private final SchemaPlus schema;
+    private final PrepareService prepareService;
+    private final ExecutionService executionService;
+
+    private final List<LifecycleAware> services = new ArrayList<>();
+
+    /**
+     * Constructs the object.
+     *
+     * @param nodeName A name of the node to create.
+     * @param clusterService A cluster service.
+     * @param schemaManager A schema manager to use for query planning and 
execution.
+     */
+    TestNode(
+            String nodeName,
+            ClusterService clusterService,
+            SqlSchemaManager schemaManager
+    ) {
+        this.nodeName = nodeName;
+        this.prepareService = registerService(new PrepareServiceImpl(nodeName, 
0, mock(DdlSqlToCommandConverter.class)));
+        this.schema = schemaManager.schema("PUBLIC");
+
+        TopologyService topologyService = clusterService.topologyService();
+        MessagingService messagingService = clusterService.messagingService();
+        RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
+
+        MailboxRegistry mailboxRegistry = registerService(new 
MailboxRegistryImpl());
+        QueryTaskExecutor taskExecutor = registerService(new 
QueryTaskExecutorImpl(nodeName));
+
+        MessageService messageService = registerService(new MessageServiceImpl(
+                topologyService, messagingService, taskExecutor, new 
IgniteSpinBusyLock()
+        ));
+        ExchangeService exchangeService = registerService(new 
ExchangeServiceImpl(
+                topologyService.localMember(), taskExecutor, mailboxRegistry, 
messageService
+        ));
+
+        executionService = registerService(new ExecutionServiceImpl<>(
+                topologyService.localMember(),
+                messageService,
+                new MappingServiceImpl(topologyService),
+                schemaManager,
+                mock(DdlCommandHandler.class),
+                taskExecutor,
+                rowHandler,
+                exchangeService,
+                ctx -> new LogicalRelImplementor<Object[]>(
+                        ctx,
+                        new HashFunctionFactoryImpl<>(schemaManager, 
rowHandler),
+                        mailboxRegistry,
+                        exchangeService
+                ) {
+                    @Override
+                    public Node<Object[]> visit(IgniteTableScan rel) {
+                        DataProvider<Object[]> dataProvider = 
rel.getTable().unwrap(TestTable.class).dataProvider(ctx.localNode().name());
+
+                        return new ScanNode<>(ctx, dataProvider);
+                    }
+                }
+        ));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        services.forEach(LifecycleAware::start);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        List<AutoCloseable> closeables = services.stream()
+                .map(service -> ((AutoCloseable) service::stop))
+                .collect(Collectors.toList());
+
+        Collections.reverse(closeables);
+        IgniteUtils.closeAll(closeables);
+    }
+
+    /** Returns the name of the current node. */
+    public String name() {
+        return nodeName;
+    }
+
+    /**
+     * Executes given plan on a cluster this node belongs to
+     * and returns an async cursor representing the result.
+     *
+     * @param plan A plan to execute.
+     * @return A cursor representing the result.
+     */
+    public AsyncCursor<List<Object>> executePlan(QueryPlan plan) {
+        return executionService.executePlan(plan, createContext());
+    }
+
+    /**
+     * Prepares (aka parses, validates, and optimizes) the given query string
+     * and returns the plan to execute.
+     *
+     * @param query A query string to prepare.
+     * @return A plan to execute.
+     */
+    public QueryPlan prepare(String query) {
+        SqlNodeList nodes = Commons.parse(query, 
FRAMEWORK_CONFIG.getParserConfig());
+
+        assertThat(nodes, hasSize(1));
+
+        return prepareService.prepareAsync(nodes.get(0), 
createContext()).join();
+    }
+
+    private BaseQueryContext createContext() {
+        return BaseQueryContext.builder()
+                .cancel(new QueryCancel())
+                .frameworkConfig(
+                        Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                                .defaultSchema(schema)
+                                .build()
+                )
+                .build();
+    }
+
+    private <T extends LifecycleAware> T registerService(T service) {
+        services.add(service);
+
+        return service;
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
new file mode 100644
index 0000000000..02b40cd2be
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.lang.reflect.Proxy;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
+import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
+import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.table.InternalTable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A test table which implements all necessary methods for optimizer in order 
to being use
+ * for query preparation, as well as provide access to the data for use this 
table
+ * in execution-related scenarios.
+ */
+public class TestTable implements InternalIgniteTable {
+    private static final AssertionError DATA_PROVIDER_NOT_CONFIGURED_EXCEPTION 
=
+            new AssertionError("DataProvider not configured");
+
+    private static final Map<String, DataProvider<?>> THROWING_MAP = 
(Map<String, DataProvider<?>>) Proxy.newProxyInstance(
+            TestTable.class.getClassLoader(), new Class<?>[] {Map.class}, (o, 
m, a) -> {
+                throw DATA_PROVIDER_NOT_CONFIGURED_EXCEPTION;
+            }
+    );
+
+    private final UUID id = UUID.randomUUID();
+    private final Map<String, IgniteIndex> indexes = new HashMap<>();
+
+    private final String name;
+    private final double rowCnt;
+    private final ColocationGroup colocationGroup;
+    private final TableDescriptor descriptor;
+    private final Map<String, DataProvider<?>> dataProviders;
+
+
+    /** Constructor. */
+    public TestTable(
+            TableDescriptor descriptor,
+            String name,
+            ColocationGroup colocationGroup,
+            double rowCnt
+    ) {
+        this.descriptor = descriptor;
+        this.name = name;
+        this.rowCnt = rowCnt;
+        this.colocationGroup = colocationGroup;
+
+        dataProviders = THROWING_MAP;
+    }
+
+    /** Constructor. */
+    public TestTable(
+            TableDescriptor descriptor,
+            String name,
+            Map<String, DataProvider<?>> dataProviders,
+            double rowCnt
+    ) {
+        this.descriptor = descriptor;
+        this.name = name;
+        this.rowCnt = rowCnt;
+        this.dataProviders = dataProviders;
+
+        this.colocationGroup = 
ColocationGroup.forNodes(List.copyOf(dataProviders.keySet()));
+    }
+
+    /**
+     * Returns the data provider for the given node.
+     *
+     * @param nodeName Name of the node of interest.
+     * @param <RowT> A type of the rows the data provider should produce.
+     * @return A data provider for the node of interest.
+     * @throws AssertionError in case data provider is not configured for the 
given node.
+     */
+    <RowT> DataProvider<RowT> dataProvider(String nodeName) {
+        if (!dataProviders.containsKey(nodeName)) {
+            throw DATA_PROVIDER_NOT_CONFIGURED_EXCEPTION;
+        }
+
+        return (DataProvider<RowT>) dataProviders.get(nodeName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int version() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteLogicalTableScan toRel(
+            RelOptCluster cluster,
+            RelOptTable relOptTbl,
+            List<RelHint> hints,
+            @Nullable List<RexNode> proj,
+            @Nullable RexNode cond,
+            @Nullable ImmutableBitSet requiredColumns
+    ) {
+        return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), 
hints, relOptTbl, proj, cond, requiredColumns);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteLogicalIndexScan toRel(
+            RelOptCluster cluster,
+            RelOptTable relOptTbl,
+            String idxName,
+            @Nullable List<RexNode> proj,
+            @Nullable RexNode cond,
+            @Nullable ImmutableBitSet requiredColumns
+    ) {
+        return IgniteLogicalIndexScan.create(cluster, cluster.traitSet(), 
relOptTbl, idxName, proj, cond, requiredColumns);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory, 
ImmutableBitSet bitSet) {
+        return descriptor.rowType((IgniteTypeFactory) typeFactory, bitSet);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Statistic getStatistic() {
+        return new Statistic() {
+            /** {@inheritDoc} */
+            @Override
+            public Double getRowCount() {
+                return rowCnt;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean isKey(ImmutableBitSet cols) {
+                return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public List<ImmutableBitSet> getKeys() {
+                throw new AssertionError();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public List<RelReferentialConstraint> getReferentialConstraints() {
+                throw new AssertionError();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public List<RelCollation> getCollations() {
+                return Collections.emptyList();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public RelDistribution getDistribution() {
+                throw new AssertionError();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Schema.TableType getJdbcTableType() {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isRolledUp(String col) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean rolledUpColumnValidInsideAgg(
+            String column,
+            SqlCall call,
+            SqlNode parent,
+            CalciteConnectionConfig config
+    ) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ColocationGroup colocationGroup(MappingQueryContext ctx) {
+        return colocationGroup;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteDistribution distribution() {
+        return descriptor.distribution();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableDescriptor descriptor() {
+        return descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Map<String, IgniteIndex> indexes() {
+        return Collections.unmodifiableMap(indexes);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addIndex(IgniteIndex idxTbl) {
+        indexes.put(idxTbl.name(), idxTbl);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteIndex getIndex(String idxName) {
+        return indexes.get(idxName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeIndex(String idxName) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public InternalTable table() {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow row, 
RowFactory<RowT> factory,
+            @Nullable BitSet requiredColumns) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> ModifyRow toModifyRow(ExecutionContext<RowT> ectx, RowT row, 
Operation op, @Nullable List<String> arg) {
+        throw new AssertionError();
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 9d6fb2e7b1..3b4040bcd6 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -42,7 +42,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -95,6 +94,7 @@ import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
+import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.prepare.Cloner;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -112,10 +112,8 @@ import 
org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.ModifyRow;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -666,17 +664,8 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
 
         List<RelNode> deserializedNodes = new ArrayList<>();
 
-        Map<UUID, IgniteTable> tableMap = new HashMap<>();
-
-        for (IgniteSchema schema : schemas) {
-            tableMap.putAll(schema.getTableNames().stream()
-                    .map(schema::getTable)
-                    .map(IgniteTable.class::cast)
-                    .collect(Collectors.toMap(IgniteTable::id, 
Function.identity())));
-        }
-
         for (String s : serialized) {
-            RelJsonReader reader = new RelJsonReader(new 
SqlSchemaManagerImpl(tableMap));
+            RelJsonReader reader = new RelJsonReader(new 
PredefinedSchemaManager(schemas));
             deserializedNodes.add(reader.read(s));
         }
 
@@ -1149,24 +1138,6 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
         }
     }
 
-    static class SqlSchemaManagerImpl implements SqlSchemaManager {
-        private final Map<UUID, IgniteTable> tablesById;
-
-        public SqlSchemaManagerImpl(Map<UUID, IgniteTable> tablesById) {
-            this.tablesById = tablesById;
-        }
-
-        @Override
-        public SchemaPlus schema(@Nullable String schema) {
-            throw new AssertionError();
-        }
-
-        @Override
-        public IgniteTable tableById(UUID id, int ver) {
-            return tablesById.get(id);
-        }
-    }
-
     static class TestSortedIndex implements SortedIndex {
         private final UUID id = UUID.randomUUID();
 

Reply via email to