This is an automated email from the ASF dual-hosted git repository. korlov pushed a commit to branch ignite-17275 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 13102dcb379f8cb08f8ce562411d8bf39d677cc0 Author: korlov42 <[email protected]> AuthorDate: Fri Dec 23 11:49:09 2022 +0200 IGNITE-17275 Sql. Performance testing of pure SQL execution --- modules/sql-engine/pom.xml | 25 ++ .../internal/sql/engine/SqlQueryProcessor.java | 10 +- .../sql/engine/exec/ExecutionServiceImpl.java | 28 +- .../sql/engine/schema/ColumnDescriptorImpl.java | 3 +- .../internal/sql/engine/schema/IgniteSchema.java | 13 +- .../sql/engine/benchmarks/SqlBenchmark.java | 152 ++++++++++ .../sql/engine/exec/ExecutionServiceImplTest.java | 40 +-- .../sql/engine/framework/DataProvider.java | 43 +++ .../engine/framework/PredefinedSchemaManager.java | 80 ++++++ .../sql/engine/framework/TestBuilders.java | 291 ++++++++++++++++++++ .../internal/sql/engine/framework/TestCluster.java | 66 +++++ .../sql/engine/framework/TestClusterService.java | 199 ++++++++++++++ .../internal/sql/engine/framework/TestNode.java | 199 ++++++++++++++ .../internal/sql/engine/framework/TestTable.java | 305 +++++++++++++++++++++ .../sql/engine/planner/AbstractPlannerTest.java | 33 +-- 15 files changed, 1414 insertions(+), 73 deletions(-) diff --git a/modules/sql-engine/pom.xml b/modules/sql-engine/pom.xml index 6ad7ca90dc..4066df87cf 100644 --- a/modules/sql-engine/pom.xml +++ b/modules/sql-engine/pom.xml @@ -208,6 +208,19 @@ <artifactId>slf4j-jdk14</artifactId> <scope>test</scope> </dependency> + + <!-- Benchmark dependencies --> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -313,6 +326,12 @@ <artifactId>value</artifactId> <version>${immutables.version}</version> </dependency> + + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>${jmh.framework.version}</version> + </dependency> </dependencies> <configuration> <!-- <compilerArgs> @@ -330,6 +349,12 @@ <artifactId>value</artifactId> <version>${immutables.version}</version> </path> + + <path> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>${jmh.framework.version}</version> + </path> </annotationProcessorPaths> </configuration> </plugin> diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 0ce7773d9e..eecaf29519 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.sql.engine.exec.LifecycleAware; import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl; import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor; import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; +import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl; import org.apache.ignite.internal.sql.engine.prepare.PrepareService; import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; @@ -210,18 +211,17 @@ public class SqlQueryProcessor implements QueryProcessor { this.prepareSvc = prepareSvc; + var ddlCommandHandler = new DdlCommandHandler(distributionZoneManager, tableManager, indexManager, dataStorageManager); + var executionSrvc = registerService(ExecutionServiceImpl.create( clusterSrvc.topologyService(), msgSrvc, sqlSchemaManager, - distributionZoneManager, - tableManager, - indexManager, + ddlCommandHandler, taskExecutor, ArrayRowHandler.INSTANCE, mailboxRegistry, - exchangeService, - dataStorageManager + exchangeService )); clusterSrvc.topologyService().addEventHandler(executionSrvc); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 37a243efce..9b4d7027eb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -39,9 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.configuration.ConfigurationChangeException; -import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.sql.engine.AsyncCursor; @@ -72,8 +70,6 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl; import org.apache.ignite.internal.sql.engine.util.TypeUtils; -import org.apache.ignite.internal.storage.DataStorageManager; -import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.lang.IgniteInternalCheckedException; import org.apache.ignite.lang.IgniteInternalException; @@ -117,13 +113,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve * @param topSrvc Topology service. * @param msgSrvc Message service. * @param sqlSchemaManager Schema manager. - * @param indexManager Index manager. - * @param tblManager Table manager. + * @param ddlCommandHandler Handler of the DDL commands. * @param taskExecutor Task executor. * @param handler Row handler. * @param mailboxRegistry Mailbox registry. * @param exchangeSrvc Exchange service. - * @param dataStorageManager Storage manager. * @param <RowT> Type of the sql row. * @return An execution service. */ @@ -131,21 +125,18 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve TopologyService topSrvc, MessageService msgSrvc, SqlSchemaManager sqlSchemaManager, - DistributionZoneManager distributionZoneManager, - TableManager tblManager, - IndexManager indexManager, + DdlCommandHandler ddlCommandHandler, QueryTaskExecutor taskExecutor, RowHandler<RowT> handler, MailboxRegistry mailboxRegistry, - ExchangeService exchangeSrvc, - DataStorageManager dataStorageManager + ExchangeService exchangeSrvc ) { return new ExecutionServiceImpl<>( topSrvc.localMember(), msgSrvc, new MappingServiceImpl(topSrvc), sqlSchemaManager, - new DdlCommandHandler(distributionZoneManager, tblManager, indexManager, dataStorageManager), + ddlCommandHandler, taskExecutor, handler, exchangeSrvc, @@ -161,7 +152,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve /** * Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ - ExecutionServiceImpl( + public ExecutionServiceImpl( ClusterNode localNode, MessageService msgSrvc, MappingService mappingSrvc, @@ -718,8 +709,15 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve } } + /** + * A factory of the relational node implementors. + * + * @param <RowT> A type of the row the execution tree will be working with. + * @see LogicalRelImplementor + */ @FunctionalInterface - interface ImplementorFactory<RowT> { + public interface ImplementorFactory<RowT> { + /** Creates the relational node implementor with the given context. */ LogicalRelImplementor<RowT> create(ExecutionContext<RowT> ctx); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java index 76f56d23a3..2dcc455ecb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/ColumnDescriptorImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.schema; import java.util.Objects; import java.util.function.Supplier; import org.apache.ignite.internal.schema.NativeType; +import org.jetbrains.annotations.Nullable; /** * Simple implementation of {@link ColumnDescriptor}. @@ -65,7 +66,7 @@ public class ColumnDescriptorImpl implements ColumnDescriptor { int physicalIndex, NativeType type, DefaultValueStrategy defaultStrategy, - Supplier<Object> dfltVal + @Nullable Supplier<Object> dfltVal ) { this.key = key; this.nullable = nullable; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java index 52aca0781d..cd64f671a6 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; +import org.jetbrains.annotations.Nullable; /** * Ignite schema. @@ -38,11 +39,17 @@ public class IgniteSchema extends AbstractSchema { * Creates a Schema. * * @param schemaName Schema name. + * @param tableMap Schema name. + * @param indexMap Schema name. */ - public IgniteSchema(String schemaName, Map<String, Table> tblMap, Map<UUID, IgniteIndex> idxMap) { + public IgniteSchema( + String schemaName, + @Nullable Map<String, Table> tableMap, + @Nullable Map<UUID, IgniteIndex> indexMap + ) { this.schemaName = schemaName; - this.tblMap = tblMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(tblMap); - this.idxMap = idxMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(idxMap); + this.tblMap = tableMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(tableMap); + this.idxMap = indexMap == null ? new ConcurrentHashMap<>() : new ConcurrentHashMap<>(indexMap); } /** diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java new file mode 100644 index 0000000000..7aca579f3e --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.benchmarks; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.sql.engine.framework.DataProvider; +import org.apache.ignite.internal.sql.engine.framework.TestBuilders; +import org.apache.ignite.internal.sql.engine.framework.TestCluster; +import org.apache.ignite.internal.sql.engine.framework.TestNode; +import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * A micro-benchmark of sql execution. + */ +@Warmup(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@State(Scope.Benchmark) +public class SqlBenchmark { + private final DataProvider<Object[]> dataProvider = new SameRowDataProvider( + new Object[] {42, UUID.randomUUID().toString()}, 333 + ); + + // @formatter:off + private final TestCluster cluster = TestBuilders.cluster() + .nodes("N1", "N2", "N3") + .addTable() + .name("T1") + .distribution(IgniteDistributions.hash(List.of(0))) + .addColumn("ID", NativeTypes.INT32) + .addColumn("VAL", NativeTypes.stringOf(64)) + .addDataProvider("N1", dataProvider) + .addDataProvider("N2", dataProvider) + .addDataProvider("N3", dataProvider) + .end() + .build(); + // @formatter:on + + private final TestNode gatewayNode = cluster.node("N1"); + + private QueryPlan plan; + + /** Starts the cluster and prepares the plan of the query. */ + @Setup + public void setUp() { + cluster.start(); + + plan = gatewayNode.prepare("SELECT * FROM t1"); + } + + /** Stops the cluster. */ + @TearDown + public void tearDown() throws Exception { + cluster.stop(); + } + + /** Very simple test to measure performance of minimal possible distributed query. */ + @Benchmark + public void selectAllSimple(Blackhole bh) { + for (var row : await(gatewayNode.executePlan(plan).requestNextAsync(1_000)).items()) { + bh.consume(row); + } + } + + /** + * Runs the benchmark. + * + * @param args args + * @throws Exception if something goes wrong + */ + public static void main(String[] args) throws Exception { + Options build = new OptionsBuilder() + //.addProfiler("gc") + .include(SqlBenchmark.class.getName()) + .build(); + + new Runner(build).run(); + } + + private static class SameRowDataProvider implements DataProvider<Object[]> { + private final int times; + private final Object[] row; + + SameRowDataProvider(Object[] row, int times) { + this.times = times; + this.row = row; + } + + @Override + public Iterator<Object[]> iterator() { + return new Iterator<>() { + private int counter; + + @Override + public boolean hasNext() { + return counter < times; + } + + @Override + public Object[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + counter++; + + return row; + } + }; + } + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index 92e963db86..49771678a9 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.sql.engine.exec; -import static org.apache.ignite.internal.sql.engine.util.BaseQueryContext.CLUSTER; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; @@ -35,6 +34,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Queue; @@ -45,18 +45,20 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.schema.NativeType; +import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult; import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; import org.apache.ignite.internal.sql.engine.exec.rel.Node; import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode; +import org.apache.ignite.internal.sql.engine.framework.TestTable; import org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage; import org.apache.ignite.internal.sql.engine.message.MessageListener; import org.apache.ignite.internal.sql.engine.message.MessageService; @@ -64,14 +66,16 @@ import org.apache.ignite.internal.sql.engine.message.QueryStartRequest; import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; import org.apache.ignite.internal.sql.engine.metadata.RemoteException; -import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTable; -import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext; import org.apache.ignite.internal.sql.engine.prepare.PrepareService; import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; +import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor; +import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl; +import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; +import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; @@ -108,7 +112,7 @@ public class ExecutionServiceImplTest { ); private final TestTable table = createTable("TEST_TBL", 1_000_000, IgniteDistributions.random(), - "ID", Integer.class, "VAL", Integer.class); + "ID", NativeTypes.INT32, "VAL", NativeTypes.INT32); private final IgniteSchema schema = new IgniteSchema("PUBLIC", Map.of(table.name(), table), null); @@ -588,22 +592,22 @@ public class ExecutionServiceImplTest { throw new IllegalArgumentException("'fields' should be non-null array with even number of elements"); } - RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(CLUSTER.getTypeFactory()); + List<ColumnDescriptor> columns = new ArrayList<>(); for (int i = 0; i < fields.length; i += 2) { - b.add((String) fields[i], CLUSTER.getTypeFactory().createJavaType((Class<?>) fields[i + 1])); + columns.add( + new ColumnDescriptorImpl( + (String) fields[i], false, true, i, i, + (NativeType) fields[i + 1], DefaultValueStrategy.DEFAULT_NULL, null + ) + ); } - return new TestTable(name, b.build(), size) { - @Override - public IgniteDistribution distribution() { - return distr; - } - - @Override - public ColocationGroup colocationGroup(MappingQueryContext ctx) { - return ColocationGroup.forNodes(nodeNames); - } - }; + return new TestTable( + new TableDescriptorImpl(columns, distr), + name, + ColocationGroup.forNodes(nodeNames), + size + ); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java new file mode 100644 index 0000000000..a143f4c6ae --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.util.Collection; + +/** + * Producer of the rows to use with {@link TestTable} in execution-related scenarios. + * + * <p>A data provider is supposed to be created for table on per-node basis. It's up + * to developer to keep data provider in sync with the schema of the table this data provider relates to. + * + * @param <T> A type of the produced elements. + * @see TestTable + */ +@FunctionalInterface +public interface DataProvider<T> extends Iterable<T> { + /** + * Creates data provider from given collection. + * + * @param collection Collection to use as source of data. + * @param <T> A type of the produced elements. + * @return A data provider instance backed by given collection. + */ + static <T> DataProvider<T> fromCollection(Collection<T> collection) { + return collection::iterator; + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java new file mode 100644 index 0000000000..bd9e9bc4ce --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; +import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; +import org.jetbrains.annotations.Nullable; + +/** + * Dummy wrapper for predefined collection of schemas. + * + * <p>Accepts collection of {@link IgniteSchema schemas} as parameter and implements required + * methods of {@link SqlSchemaManager} around them. Assumes given schemas will never be changed. + * + * @see IgniteSchema + * @see SqlSchemaManager + */ +public class PredefinedSchemaManager implements SqlSchemaManager { + private final SchemaPlus root; + private final Map<UUID, IgniteTable> tableById; + + /** Constructs schema manager from a single schema. */ + public PredefinedSchemaManager(IgniteSchema schema) { + this(List.of(schema)); + } + + /** Constructs schema manager from a collection of schemas. */ + public PredefinedSchemaManager(Collection<IgniteSchema> schemas) { + this.root = Frameworks.createRootSchema(false); + this.tableById = new HashMap<>(); + + for (IgniteSchema schema : schemas) { + root.add(schema.getName(), schema); + + tableById.putAll( + schema.getTableNames().stream() + .map(schema::getTable) + .map(IgniteTable.class::cast) + .collect(Collectors.toMap(IgniteTable::id, Function.identity())) + ); + } + } + + /** {@inheritDoc} */ + @Override + public SchemaPlus schema(@Nullable String schema) { + return schema == null ? root : root.getSubSchema(schema); + } + + /** {@inheritDoc} */ + @Override + public IgniteTable tableById(UUID id, int ver) { + return tableById.get(id); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java new file mode 100644 index 0000000000..23aaa6bcb1 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.calcite.schema.Table; +import org.apache.ignite.internal.schema.NativeType; +import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor; +import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl; +import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy; +import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; + +/** + * A collection of builders to create test objects. + */ +public class TestBuilders { + /** Returns a builder of the test cluster object. */ + public static ClusterBuilder cluster() { + return new ClusterBuilderImpl(); + } + + /** Returns a builder of the test table object. */ + public static TableBuilder table() { + return new TableBuilderImpl(); + } + + /** + * A builder to created a test cluster object. + * + * @see TestCluster + */ + public interface ClusterBuilder { + /** + * Sets desired names for the cluster nodes. + * + * @param nodeNames An array of node names to create cluster from. + * @return {@code this} for chaining. + */ + ClusterBuilder nodes(String... nodeNames); + + /** + * Creates a table builder to add to the cluster. + * + * @return An instance of table builder. + */ + ClusterTableBuilder addTable(); + + /** + * Builds the cluster object. + * + * @return Created cluster object. + */ + TestCluster build(); + } + + /** + * A builder to created a test table object. + * + * @see TestTable + */ + public interface TableBuilder extends TableBuilderBase<TableBuilder> { + /** + * Builds a table. + * + * @return Created table object. + */ + public TestTable build(); + } + + /** + * A builder to created a test table as nested object of the cluster. + * + * @see TestTable + * @see TestCluster + */ + public interface ClusterTableBuilder extends TableBuilderBase<ClusterTableBuilder>, NestedBuilder<ClusterBuilder> { + } + + private static class ClusterBuilderImpl implements ClusterBuilder { + private final List<TestTable> tables = new ArrayList<>(); + private List<String> nodeNames; + + @Override + public ClusterBuilder nodes(String... nodeNames) { + this.nodeNames = List.of(nodeNames); + + return this; + } + + @Override + public ClusterTableBuilder addTable() { + return new ClusterTableBuilderImpl(this); + } + + @Override + public TestCluster build() { + var clusterService = new TestClusterService(nodeNames); + + Map<String, Table> tableMap = tables.stream() + .collect(Collectors.toMap(TestTable::name, Function.identity())); + + var schemaManager = new PredefinedSchemaManager(new IgniteSchema("PUBLIC", tableMap, null)); + + Map<String, TestNode> nodes = nodeNames.stream() + .map(name -> new TestNode(name, clusterService.spawnForNode(name), schemaManager)) + .collect(Collectors.toMap(TestNode::name, Function.identity())); + + return new TestCluster(nodes); + } + + } + + private static class TableBuilderImpl extends AbstractTableBuilderImpl<TableBuilder> implements TableBuilder { + /** {@inheritDoc} */ + @Override + public TestTable build() { + return new TestTable( + new TableDescriptorImpl(columns, distribution), name, dataProviders, size + ); + } + + /** {@inheritDoc} */ + @Override + protected TableBuilder self() { + return this; + } + } + + private static class ClusterTableBuilderImpl extends AbstractTableBuilderImpl<ClusterTableBuilder> implements ClusterTableBuilder { + private final ClusterBuilderImpl parent; + + private ClusterTableBuilderImpl(ClusterBuilderImpl parent) { + this.parent = parent; + } + + /** {@inheritDoc} */ + @Override + protected ClusterTableBuilder self() { + return this; + } + + /** {@inheritDoc} */ + @Override + public ClusterBuilder end() { + parent.tables.add(new TestTable( + new TableDescriptorImpl(columns, distribution), name, dataProviders, size + )); + + return parent; + } + } + + private abstract static class AbstractTableBuilderImpl<ChildT> implements TableBuilderBase<ChildT> { + protected final List<ColumnDescriptor> columns = new ArrayList<>(); + protected final Map<String, DataProvider<?>> dataProviders = new HashMap<>(); + + protected String name; + protected IgniteDistribution distribution; + protected int size = 100_000; + + protected abstract ChildT self(); + + /** {@inheritDoc} */ + @Override + public ChildT name(String name) { + this.name = name; + + return self(); + } + + /** {@inheritDoc} */ + @Override + public ChildT distribution(IgniteDistribution distribution) { + this.distribution = distribution; + + return self(); + } + + /** {@inheritDoc} */ + @Override + public ChildT addColumn(String name, NativeType type) { + columns.add(new ColumnDescriptorImpl( + name, false, true, columns.size(), columns.size(), type, DefaultValueStrategy.DEFAULT_NULL, null + )); + + return self(); + } + + /** {@inheritDoc} */ + @Override + public ChildT addDataProvider(String targetNode, DataProvider<?> dataProvider) { + this.dataProviders.put(targetNode, dataProvider); + + return self(); + } + + /** {@inheritDoc} */ + @Override + public ChildT size(int size) { + this.size = size; + + return self(); + } + } + + /** + * Base interface describing the complete set of table-related fields. + * + * <p>The sole purpose of this interface is to keep in sync both variants of table's builders. + * + * @param <ChildT> An actual type of builder that should be exposed to the user. + * @see ClusterTableBuilder + * @see TableBuilder + */ + private interface TableBuilderBase<ChildT> { + /** Sets the name of the table. */ + ChildT name(String name); + + /** Sets the distribution of the table. */ + ChildT distribution(IgniteDistribution distribution); + + /** Adds a column to the table. */ + ChildT addColumn(String name, NativeType type); + + /** + * Adds a data provider for the given node to the table. + * TODO + */ + ChildT addDataProvider(String targetNode, DataProvider<?> dataProvider); + + /** Sets the size of the table. */ + ChildT size(int size); + } + + /** + * This interfaces provides a nested builder with ability to return on the previous layer. + * + * <p>For example:</p> + * <pre> + * interface ChildBuilder implements NestedBuilder<ParentBuilder> { + * ChildBuilder nestedFoo(); + * } + * + * interface ParentBuilder { + * ParentBuilder foo(); + * ParentBuilder bar(); + * ChildBuilder child(); + * } + * + * Builders.parent() + * .foo() + * .child() // now we are dealing with the ChildBuilder + * .nestedFoo() + * .end() // and here we are returning back to the ParentBuilder + * .bar() + * .build() + * </pre> + */ + @FunctionalInterface + private interface NestedBuilder<ParentT> { + /** + * Notifies the builder's chain of the nested builder that we need to return back to the + * previous layer. + * + * @return An instance of the parent builder. + */ + ParentT end(); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java new file mode 100644 index 0000000000..9a660217be --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.ignite.internal.sql.engine.exec.LifecycleAware; +import org.apache.ignite.internal.util.IgniteUtils; + +/** + * A test cluster object. + * + * <p>This is convenient holder of collection of nodes which provides methods for centralised + * accessing and management. + * + * <p>NB: do not forget to {@link #start()} cluster before use, and {@link #stop()} the cluster after. + */ +public class TestCluster implements LifecycleAware { + private final Map<String, TestNode> nodeByName; + + TestCluster(Map<String, TestNode> nodeByName) { + this.nodeByName = nodeByName; + } + + /** + * Returns the node for the given name, if exists. + * + * @param name A name of the node of interest. + * @return A test node or {@code null} if there is no node with such name. + */ + public TestNode node(String name) { + return nodeByName.get(name); + } + + /** {@inheritDoc} */ + @Override + public void start() { + nodeByName.values().forEach(TestNode::start); + } + + /** {@inheritDoc} */ + @Override + public void stop() throws Exception { + List<AutoCloseable> closeables = nodeByName.values().stream() + .map(node -> ((AutoCloseable) node::stop)) + .collect(Collectors.toList()); + + IgniteUtils.closeAll(closeables); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java new file mode 100644 index 0000000000..48b9981962 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.ignite.network.AbstractMessagingService; +import org.apache.ignite.network.AbstractTopologyService; +import org.apache.ignite.network.ClusterLocalConfiguration; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.NodeMetadata; +import org.apache.ignite.network.TopologyService; +import org.jetbrains.annotations.Nullable; + +/** + * Dummy cluster service object which helps to created connect + * instances of {@link TopologyService} and {@link MessagingService} for + * connecting group of {@link TestNode} within single cluster with each other. + */ +class TestClusterService { + private final List<String> allNodes; + + private final Map<String, LocalMessagingService> messagingServicesByNode = new ConcurrentHashMap<>(); + private final Map<String, LocalTopologyService> topologyServicesByNode = new ConcurrentHashMap<>(); + + /** + * Creates a cluster service object for given collection of nodes. + * + * @param allNodes A collection of nodes to create cluster service from. + */ + TestClusterService(List<String> allNodes) { + this.allNodes = allNodes; + } + + ClusterService spawnForNode(String nodeName) { + return new ClusterService() { + /** {@inheritDoc} */ + @Override + public TopologyService topologyService() { + return topologyServicesByNode.computeIfAbsent(nodeName, name -> new LocalTopologyService(name, allNodes)); + } + + /** {@inheritDoc} */ + @Override + public MessagingService messagingService() { + return messagingServicesByNode.computeIfAbsent(nodeName, LocalMessagingService::new); + } + + /** {@inheritDoc} */ + @Override + public ClusterLocalConfiguration localConfiguration() { + throw new AssertionError("Should not be called"); + } + + /** {@inheritDoc} */ + @Override + public boolean isStopped() { + return false; + } + + /** {@inheritDoc} */ + @Override + public void updateMetadata(NodeMetadata metadata) { + throw new AssertionError("Should not be called"); + } + + /** {@inheritDoc} */ + @Override + public void start() { + + } + }; + } + + private static class LocalTopologyService extends AbstractTopologyService { + private static final AtomicInteger NODE_COUNTER = new AtomicInteger(1); + + private final ClusterNode localMember; + private final Map<String, ClusterNode> allMembers; + private final Map<NetworkAddress, ClusterNode> allMembersByAddress; + + private LocalTopologyService(String localMember, List<String> allMembers) { + this.allMembers = allMembers.stream() + .map(LocalTopologyService::nodeFromName) + .collect(Collectors.toMap(ClusterNode::name, Function.identity())); + + this.localMember = this.allMembers.get(localMember); + + if (this.localMember == null) { + throw new IllegalArgumentException("Local member is not part of all members"); + } + + this.allMembersByAddress = new HashMap<>(); + + this.allMembers.forEach((ignored, member) -> allMembersByAddress.put(member.address(), member)); + } + + private static ClusterNode nodeFromName(String name) { + return new ClusterNode(name, name, NetworkAddress.from("127.0.0.1:" + NODE_COUNTER.incrementAndGet())); + } + + /** {@inheritDoc} */ + @Override + public ClusterNode localMember() { + return localMember; + } + + /** {@inheritDoc} */ + @Override + public Collection<ClusterNode> allMembers() { + return allMembers.values(); + } + + /** {@inheritDoc} */ + @Override + public @Nullable ClusterNode getByAddress(NetworkAddress addr) { + return allMembersByAddress.get(addr); + } + + /** {@inheritDoc} */ + @Override + public @Nullable ClusterNode getByConsistentId(String consistentId) { + return allMembers.get(consistentId); + } + } + + private class LocalMessagingService extends AbstractMessagingService { + private final String localNodeName; + + private LocalMessagingService(String localNodeName) { + this.localNodeName = localNodeName; + } + + /** {@inheritDoc} */ + @Override + public void weakSend(ClusterNode recipient, NetworkMessage msg) { + throw new AssertionError("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) { + for (var handler : messagingServicesByNode.get(recipient.name()).messageHandlers(msg.groupType())) { + handler.onReceived(msg, localNodeName, null); + } + + return CompletableFuture.completedFuture(null); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) { + throw new AssertionError("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> respond(String recipientConsistentId, NetworkMessage msg, long correlationId) { + throw new AssertionError("Not implemented yet"); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, NetworkMessage msg, long timeout) { + throw new AssertionError("Not implemented yet"); + } + + private Collection<NetworkMessageHandler> messageHandlers(short groupType) { + return getMessageHandlers(groupType); + } + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java new file mode 100644 index 0000000000..cba801716e --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.internal.sql.engine.AsyncCursor; +import org.apache.ignite.internal.sql.engine.QueryCancel; +import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler; +import org.apache.ignite.internal.sql.engine.exec.ExchangeService; +import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl; +import org.apache.ignite.internal.sql.engine.exec.ExecutionService; +import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl; +import org.apache.ignite.internal.sql.engine.exec.LifecycleAware; +import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor; +import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry; +import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl; +import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor; +import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; +import org.apache.ignite.internal.sql.engine.exec.rel.Node; +import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode; +import org.apache.ignite.internal.sql.engine.message.MessageService; +import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl; +import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl; +import org.apache.ignite.internal.sql.engine.prepare.PrepareService; +import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; +import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; +import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; +import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; +import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; +import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.TopologyService; + +/** + * An object representing a node in test cluster. + * + * <p>Provides convenient access to the methods for optimization and execution of the queries. + */ +public class TestNode implements LifecycleAware { + private final String nodeName; + private final SchemaPlus schema; + private final PrepareService prepareService; + private final ExecutionService executionService; + + private final List<LifecycleAware> services = new ArrayList<>(); + + /** + * Constructs the object. + * + * @param nodeName A name of the node to create. + * @param clusterService A cluster service. + * @param schemaManager A schema manager to use for query planning and execution. + */ + TestNode( + String nodeName, + ClusterService clusterService, + SqlSchemaManager schemaManager + ) { + this.nodeName = nodeName; + this.prepareService = registerService(new PrepareServiceImpl(nodeName, 0, mock(DdlSqlToCommandConverter.class))); + this.schema = schemaManager.schema("PUBLIC"); + + TopologyService topologyService = clusterService.topologyService(); + MessagingService messagingService = clusterService.messagingService(); + RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE; + + MailboxRegistry mailboxRegistry = registerService(new MailboxRegistryImpl()); + QueryTaskExecutor taskExecutor = registerService(new QueryTaskExecutorImpl(nodeName)); + + MessageService messageService = registerService(new MessageServiceImpl( + topologyService, messagingService, taskExecutor, new IgniteSpinBusyLock() + )); + ExchangeService exchangeService = registerService(new ExchangeServiceImpl( + topologyService.localMember(), taskExecutor, mailboxRegistry, messageService + )); + + executionService = registerService(new ExecutionServiceImpl<>( + topologyService.localMember(), + messageService, + new MappingServiceImpl(topologyService), + schemaManager, + mock(DdlCommandHandler.class), + taskExecutor, + rowHandler, + exchangeService, + ctx -> new LogicalRelImplementor<Object[]>( + ctx, + new HashFunctionFactoryImpl<>(schemaManager, rowHandler), + mailboxRegistry, + exchangeService + ) { + @Override + public Node<Object[]> visit(IgniteTableScan rel) { + DataProvider<Object[]> dataProvider = rel.getTable().unwrap(TestTable.class).dataProvider(ctx.localNode().name()); + + return new ScanNode<>(ctx, dataProvider); + } + } + )); + } + + /** {@inheritDoc} */ + @Override + public void start() { + services.forEach(LifecycleAware::start); + } + + /** {@inheritDoc} */ + @Override + public void stop() throws Exception { + List<AutoCloseable> closeables = services.stream() + .map(service -> ((AutoCloseable) service::stop)) + .collect(Collectors.toList()); + + Collections.reverse(closeables); + IgniteUtils.closeAll(closeables); + } + + /** Returns the name of the current node. */ + public String name() { + return nodeName; + } + + /** + * Executes given plan on a cluster this node belongs to + * and returns an async cursor representing the result. + * + * @param plan A plan to execute. + * @return A cursor representing the result. + */ + public AsyncCursor<List<Object>> executePlan(QueryPlan plan) { + return executionService.executePlan(plan, createContext()); + } + + /** + * Prepares (aka parses, validates, and optimizes) the given query string + * and returns the plan to execute. + * + * @param query A query string to prepare. + * @return A plan to execute. + */ + public QueryPlan prepare(String query) { + SqlNodeList nodes = Commons.parse(query, FRAMEWORK_CONFIG.getParserConfig()); + + assertThat(nodes, hasSize(1)); + + return prepareService.prepareAsync(nodes.get(0), createContext()).join(); + } + + private BaseQueryContext createContext() { + return BaseQueryContext.builder() + .cancel(new QueryCancel()) + .frameworkConfig( + Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) + .defaultSchema(schema) + .build() + ) + .build(); + } + + private <T extends LifecycleAware> T registerService(T service) { + services.add(service); + + return service; + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java new file mode 100644 index 0000000000..02b40cd2be --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.lang.reflect.Proxy; +import java.util.BitSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelReferentialConstraint; +import org.apache.calcite.rel.core.TableModify.Operation; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; +import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; +import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext; +import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan; +import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan; +import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; +import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; +import org.apache.ignite.internal.sql.engine.schema.ModifyRow; +import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; +import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; +import org.apache.ignite.internal.table.InternalTable; +import org.jetbrains.annotations.Nullable; + +/** + * A test table which implements all necessary methods for optimizer in order to being use + * for query preparation, as well as provide access to the data for use this table + * in execution-related scenarios. + */ +public class TestTable implements InternalIgniteTable { + private static final AssertionError DATA_PROVIDER_NOT_CONFIGURED_EXCEPTION = + new AssertionError("DataProvider not configured"); + + private static final Map<String, DataProvider<?>> THROWING_MAP = (Map<String, DataProvider<?>>) Proxy.newProxyInstance( + TestTable.class.getClassLoader(), new Class<?>[] {Map.class}, (o, m, a) -> { + throw DATA_PROVIDER_NOT_CONFIGURED_EXCEPTION; + } + ); + + private final UUID id = UUID.randomUUID(); + private final Map<String, IgniteIndex> indexes = new HashMap<>(); + + private final String name; + private final double rowCnt; + private final ColocationGroup colocationGroup; + private final TableDescriptor descriptor; + private final Map<String, DataProvider<?>> dataProviders; + + + /** Constructor. */ + public TestTable( + TableDescriptor descriptor, + String name, + ColocationGroup colocationGroup, + double rowCnt + ) { + this.descriptor = descriptor; + this.name = name; + this.rowCnt = rowCnt; + this.colocationGroup = colocationGroup; + + dataProviders = THROWING_MAP; + } + + /** Constructor. */ + public TestTable( + TableDescriptor descriptor, + String name, + Map<String, DataProvider<?>> dataProviders, + double rowCnt + ) { + this.descriptor = descriptor; + this.name = name; + this.rowCnt = rowCnt; + this.dataProviders = dataProviders; + + this.colocationGroup = ColocationGroup.forNodes(List.copyOf(dataProviders.keySet())); + } + + /** + * Returns the data provider for the given node. + * + * @param nodeName Name of the node of interest. + * @param <RowT> A type of the rows the data provider should produce. + * @return A data provider for the node of interest. + * @throws AssertionError in case data provider is not configured for the given node. + */ + <RowT> DataProvider<RowT> dataProvider(String nodeName) { + if (!dataProviders.containsKey(nodeName)) { + throw DATA_PROVIDER_NOT_CONFIGURED_EXCEPTION; + } + + return (DataProvider<RowT>) dataProviders.get(nodeName); + } + + /** {@inheritDoc} */ + @Override + public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @Override + public int version() { + return 0; + } + + /** {@inheritDoc} */ + @Override + public IgniteLogicalTableScan toRel( + RelOptCluster cluster, + RelOptTable relOptTbl, + List<RelHint> hints, + @Nullable List<RexNode> proj, + @Nullable RexNode cond, + @Nullable ImmutableBitSet requiredColumns + ) { + return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), hints, relOptTbl, proj, cond, requiredColumns); + } + + /** {@inheritDoc} */ + @Override + public IgniteLogicalIndexScan toRel( + RelOptCluster cluster, + RelOptTable relOptTbl, + String idxName, + @Nullable List<RexNode> proj, + @Nullable RexNode cond, + @Nullable ImmutableBitSet requiredColumns + ) { + return IgniteLogicalIndexScan.create(cluster, cluster.traitSet(), relOptTbl, idxName, proj, cond, requiredColumns); + } + + /** {@inheritDoc} */ + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) { + return descriptor.rowType((IgniteTypeFactory) typeFactory, bitSet); + } + + /** {@inheritDoc} */ + @Override + public Statistic getStatistic() { + return new Statistic() { + /** {@inheritDoc} */ + @Override + public Double getRowCount() { + return rowCnt; + } + + /** {@inheritDoc} */ + @Override + public boolean isKey(ImmutableBitSet cols) { + return false; + } + + /** {@inheritDoc} */ + @Override + public List<ImmutableBitSet> getKeys() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override + public List<RelReferentialConstraint> getReferentialConstraints() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override + public List<RelCollation> getCollations() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override + public RelDistribution getDistribution() { + throw new AssertionError(); + } + }; + } + + /** {@inheritDoc} */ + @Override + public Schema.TableType getJdbcTableType() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override + public boolean isRolledUp(String col) { + return false; + } + + /** {@inheritDoc} */ + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + SqlNode parent, + CalciteConnectionConfig config + ) { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override + public ColocationGroup colocationGroup(MappingQueryContext ctx) { + return colocationGroup; + } + + /** {@inheritDoc} */ + @Override + public IgniteDistribution distribution() { + return descriptor.distribution(); + } + + /** {@inheritDoc} */ + @Override + public TableDescriptor descriptor() { + return descriptor; + } + + /** {@inheritDoc} */ + @Override + public Map<String, IgniteIndex> indexes() { + return Collections.unmodifiableMap(indexes); + } + + /** {@inheritDoc} */ + @Override + public void addIndex(IgniteIndex idxTbl) { + indexes.put(idxTbl.name(), idxTbl); + } + + /** {@inheritDoc} */ + @Override + public IgniteIndex getIndex(String idxName) { + return indexes.get(idxName); + } + + /** {@inheritDoc} */ + @Override + public void removeIndex(String idxName) { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override + public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override + public InternalTable table() { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override + public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow row, RowFactory<RowT> factory, + @Nullable BitSet requiredColumns) { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override + public <RowT> ModifyRow toModifyRow(ExecutionContext<RowT> ectx, RowT row, Operation op, @Nullable List<String> arg) { + throw new AssertionError(); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java index 9d6fb2e7b1..3b4040bcd6 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java @@ -42,7 +42,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Flow.Publisher; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -95,6 +94,7 @@ import org.apache.ignite.internal.schema.NativeType; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader; +import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager; import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; import org.apache.ignite.internal.sql.engine.prepare.Cloner; import org.apache.ignite.internal.sql.engine.prepare.Fragment; @@ -112,10 +112,8 @@ import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor; import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; -import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; import org.apache.ignite.internal.sql.engine.schema.ModifyRow; -import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.TraitUtils; @@ -666,17 +664,8 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest { List<RelNode> deserializedNodes = new ArrayList<>(); - Map<UUID, IgniteTable> tableMap = new HashMap<>(); - - for (IgniteSchema schema : schemas) { - tableMap.putAll(schema.getTableNames().stream() - .map(schema::getTable) - .map(IgniteTable.class::cast) - .collect(Collectors.toMap(IgniteTable::id, Function.identity()))); - } - for (String s : serialized) { - RelJsonReader reader = new RelJsonReader(new SqlSchemaManagerImpl(tableMap)); + RelJsonReader reader = new RelJsonReader(new PredefinedSchemaManager(schemas)); deserializedNodes.add(reader.read(s)); } @@ -1149,24 +1138,6 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest { } } - static class SqlSchemaManagerImpl implements SqlSchemaManager { - private final Map<UUID, IgniteTable> tablesById; - - public SqlSchemaManagerImpl(Map<UUID, IgniteTable> tablesById) { - this.tablesById = tablesById; - } - - @Override - public SchemaPlus schema(@Nullable String schema) { - throw new AssertionError(); - } - - @Override - public IgniteTable tableById(UUID id, int ver) { - return tablesById.get(id); - } - } - static class TestSortedIndex implements SortedIndex { private final UUID id = UUID.randomUUID();
