This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8d7643f944 IGNITE-20714 Sql. Test Framework. Support DDL scripts to
initialise schema (#2752)
8d7643f944 is described below
commit 8d7643f9448fb341d488e3081babe465c5b39b7d
Author: korlov42 <[email protected]>
AuthorDate: Thu Oct 26 17:14:33 2023 +0300
IGNITE-20714 Sql. Test Framework. Support DDL scripts to initialise schema
(#2752)
---
.../internal/catalog/CatalogManagerImpl.java | 7 +-
.../internal/sql/engine/prepare/PlannerPhase.java | 3 -
.../RepeatedRandomRowDataProviderFactory.java | 45 --
.../sql/engine/benchmarks/SqlBenchmark.java | 9 +-
.../{TpchQueries.java => TpchHelper.java} | 14 +-
.../sql/engine/benchmarks/TpchParseBenchmark.java | 4 +-
.../engine/benchmarks/TpchPrepareBenchmark.java | 14 +-
.../internal/sql/engine/benchmarks/TpchSchema.java | 165 -----
.../sql/engine/framework/TestBuilders.java | 739 ++++++++++++++-------
.../internal/sql/engine/framework/TestCluster.java | 23 +-
.../sql/engine/framework/TestClusterTest.java | 130 +++-
.../internal/sql/engine/framework/TestIndex.java | 42 +-
.../internal/sql/engine/framework/TestNode.java | 102 ++-
.../internal/sql/engine/util/QueryCheckerTest.java | 8 +-
.../test/resources/tpch/schema_definition_ddl.sql | 125 ++++
15 files changed, 858 insertions(+), 572 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 87bfafb5f1..9b475644d5 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -26,6 +26,7 @@ import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.va
import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateRenameZoneParams;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParams;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParamsAndPreviousValue;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import java.util.ArrayList;
import java.util.Arrays;
@@ -317,7 +318,11 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
}
@Override
- public CompletableFuture<Void> execute(List<CatalogCommand> commands)
throws IllegalArgumentException {
+ public CompletableFuture<Void> execute(List<CatalogCommand> commands) {
+ if (nullOrEmpty(commands)) {
+ return completedFuture(null);
+ }
+
return saveUpdateAndWaitForActivation(new
BulkUpdateProducer(List.copyOf(commands)));
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index adda00bccb..e40d976799 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -126,9 +126,6 @@ public enum PlannerPhase {
FilterMergeRule.Config.DEFAULT
.withOperandFor(LogicalFilter.class).toRule(),
- JoinPushThroughJoinRule.Config.LEFT
- .withOperandFor(LogicalJoin.class).toRule(),
-
JoinPushThroughJoinRule.Config.RIGHT
.withOperandFor(LogicalJoin.class).toRule(),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/RepeatedRandomRowDataProviderFactory.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/RepeatedRandomRowDataProviderFactory.java
deleted file mode 100644
index 40bba39604..0000000000
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/RepeatedRandomRowDataProviderFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.benchmarks;
-
-import java.util.List;
-import org.apache.ignite.internal.sql.engine.framework.DataProvider;
-import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.DataProviderFactory;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
-import org.apache.ignite.internal.sql.engine.util.NativeTypeValues;
-
-/**
- * {@link DataProviderFactory} that creates {@link DataProvider}s that
generates a row of pseudo random data based on table column types
- * and then returns the same row multiple times.
- */
-final class RepeatedRandomRowDataProviderFactory implements
DataProviderFactory {
-
- private final int dataSize;
-
- RepeatedRandomRowDataProviderFactory(int dataSize) {
- this.dataSize = dataSize;
- }
-
- /** {@inheritDoc} **/
- @Override
- public DataProvider<Object[]> createDataProvider(String tableName,
List<ColumnDescriptor> columns) {
- Object[] row = columns.stream().map(c -> NativeTypeValues.value(1,
c.physicalType().spec())).toArray();
-
- return DataProvider.fromRow(row, dataSize);
- }
-}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
index ce13964d2b..10e0f410f1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.benchmarks;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
@@ -27,7 +26,6 @@ import
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.framework.TestNode;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.type.NativeTypes;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -66,11 +64,12 @@ public class SqlBenchmark {
.nodes("N1", "N2", "N3")
.addTable()
.name("T1")
- .distribution(IgniteDistributions.hash(List.of(0)))
- .addColumn("ID", NativeTypes.INT32)
+ .addKeyColumn("ID", NativeTypes.INT32)
.addColumn("VAL", NativeTypes.stringOf(64))
- .defaultDataProvider(dataProvider)
.end()
+ .dataProvider("N1", "T1", TestBuilders.tableScan(dataProvider))
+ .dataProvider("N2", "T1", TestBuilders.tableScan(dataProvider))
+ .dataProvider("N3", "T1", TestBuilders.tableScan(dataProvider))
.build();
// @formatter:on
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchQueries.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchHelper.java
similarity index 86%
rename from
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchQueries.java
rename to
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchHelper.java
index 1c5945fe00..9f58157bd7 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchQueries.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchHelper.java
@@ -27,9 +27,9 @@ import java.nio.charset.StandardCharsets;
/**
* Provides utility methods to work with queries defined by the TPC-H
benchmark.
*/
-public final class TpchQueries {
+public final class TpchHelper {
- private TpchQueries() {
+ private TpchHelper() {
}
@@ -65,8 +65,16 @@ public final class TpchQueries {
}
}
+ /**
+ * Returns a string representing DDL script with all the tables' and
indexes' definitions
+ * required execute queries from TPC-H suite.
+ */
+ public static String getSchemaDefinitionScript() {
+ return loadFromResource("tpch/schema_definition_ddl.sql");
+ }
+
private static String loadFromResource(String resource) {
- try (InputStream is =
TpchQueries.class.getClassLoader().getResourceAsStream(resource)) {
+ try (InputStream is =
TpchHelper.class.getClassLoader().getResourceAsStream(resource)) {
if (is == null) {
throw new IllegalArgumentException("Resource does not exist: "
+ resource);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
index 7392ed9074..545429a087 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchParseBenchmark.java
@@ -48,7 +48,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
public class TpchParseBenchmark {
/**
- * Identifiers of TPC-H queries. See {@link TpchQueries#getQuery(String)}.
+ * Identifiers of TPC-H queries. See {@link TpchHelper#getQuery(String)}.
*/
@Param({
"1", "2", "3", "4", "5", "6", "7", "8", "8v", "9", "10", "11",
"12", "12v",
@@ -61,7 +61,7 @@ public class TpchParseBenchmark {
/** Prepares the plan of the query. */
@Setup
public void setUp() {
- queryString = TpchQueries.getQuery(queryId);
+ queryString = TpchHelper.getQuery(queryId);
}
/**
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
index 424f21ba7a..65c15eda31 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchPrepareBenchmark.java
@@ -48,12 +48,12 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
-@Fork(3)
+@Fork(1)
@State(Scope.Benchmark)
public class TpchPrepareBenchmark {
/**
- * Identifiers of TPC-H queries. See {@link TpchQueries#getQuery(String)}.
+ * Identifiers of TPC-H queries. See {@link TpchHelper#getQuery(String)}.
*/
@Param({
"1", "2", "3", "4", "5", "6", "7", "8", "8v", "9", "10", "11",
"12", "12v",
@@ -70,15 +70,15 @@ public class TpchPrepareBenchmark {
/** Starts the cluster and prepares the plan of the query. */
@Setup
public void setUp() {
- var clusterBuilder = TestBuilders.cluster().nodes("N1");
- TpchSchema.registerTables(clusterBuilder, 1, 10);
-
- testCluster = clusterBuilder.build();
+ testCluster = TestBuilders.cluster().nodes("N1").build();
testCluster.start();
+
gatewayNode = testCluster.node("N1");
- String query = TpchQueries.getQuery(queryId);
+ gatewayNode.initSchema(TpchHelper.getSchemaDefinitionScript());
+
+ String query = TpchHelper.getQuery(queryId);
parsedResult = new ParserServiceImpl(0,
EmptyCacheFactory.INSTANCE).parse(query);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchSchema.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchSchema.java
deleted file mode 100644
index 2f8d3c788e..0000000000
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/TpchSchema.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.benchmarks;
-
-import java.util.List;
-import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.ClusterBuilder;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
-import org.apache.ignite.internal.type.NativeTypes;
-
-/**
- * Provides utility methods to register tables described by the TPC-H
benchmark in a {@link ClusterBuilder}.
- */
-public final class TpchSchema {
-
- private static final int PART_SIZE = 200_000;
- private static final int SUPPLIER_SIZE = 10_000;
- // TPC-H 4.2.5.1 Table 3: Estimated Database Size
- private static final int PARTSUPP_SIZE = 80 * SUPPLIER_SIZE;
- private static final int ORDERS_SIZE = 1_500_000;
- // TPC-H 4.2.5.1 Table 3: Estimated Database Size
- private static final int LINEITEM_SIZE = 4 * ORDERS_SIZE;
- private static final int CUSTOMER_SIZE = 150_000;
- private static final int NATION_SIZE = 25;
- private static final int REGION_SIZE = 5;
-
- private TpchSchema() {
-
- }
-
- /**
- * Registers tables from the TPC-H benchmark in the given {@link
ClusterBuilder cluster builder} with the scaling factor of {@code 1}.
- *
- * @param clusterBuilder A cluster builder.
- * @param dataSize The number of rows data provider is going to produce
for each table.
- */
- public static void registerTables(ClusterBuilder clusterBuilder, int
dataSize) {
- registerTables(clusterBuilder, 1, dataSize);
- }
-
- /**
- * Registers tables from the TPC-H benchmark in the given cluster with the
given scaling factor.
- *
- * @param clusterBuilder A cluster builder.
- * @param scalingFactor Scaling factor.
- * @param dataSize The number of rows data provider is going to produce
for each table.
- */
- public static void registerTables(ClusterBuilder clusterBuilder, int
scalingFactor, int dataSize) {
- // Register default data provider factory that is going to generate
pseudo random data data.
- clusterBuilder.defaultDataProviderFactory(new
RepeatedRandomRowDataProviderFactory(dataSize));
-
- clusterBuilder.addTable().name("PART")
- .addColumn("P_PARTKEY", NativeTypes.INT64)
- .addColumn("P_NAME", NativeTypes.stringOf(55))
- .addColumn("P_MFGR", NativeTypes.stringOf(25))
- .addColumn("P_BRAND", NativeTypes.stringOf(10))
- .addColumn("P_TYPE", NativeTypes.stringOf(25))
- .addColumn("P_SIZE", NativeTypes.INT32)
- .addColumn("P_CONTAINER", NativeTypes.stringOf(10))
- .addColumn("P_RETAILPRICE", NativeTypes.decimalOf(15, 2))
- .addColumn("P_COMMENT", NativeTypes.stringOf(23))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(scalingFactor * PART_SIZE).end();
-
- clusterBuilder.addTable().name("SUPPLIER")
- .addColumn("S_SUPPKEY", NativeTypes.INT64)
- .addColumn("S_NAME", NativeTypes.stringOf(25))
- .addColumn("S_ADDRESS", NativeTypes.stringOf(40))
- .addColumn("S_NATIONKEY", NativeTypes.INT64)
- .addColumn("S_PHONE", NativeTypes.stringOf(15))
- .addColumn("S_ACCTBAL", NativeTypes.decimalOf(15, 2))
- .addColumn("S_COMMENT", NativeTypes.stringOf(101))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(scalingFactor * SUPPLIER_SIZE).end();
-
- clusterBuilder.addTable().name("PARTSUPP")
- .addColumn("PS_PARTKEY", NativeTypes.INT64)
- .addColumn("PS_SUPPKEY", NativeTypes.INT64)
- .addColumn("PS_AVAILQTY", NativeTypes.INT32)
- .addColumn("PS_SUPPLYCOST", NativeTypes.decimalOf(15, 2))
- .addColumn("PS_COMMENT", NativeTypes.stringOf(199))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(scalingFactor * PARTSUPP_SIZE).end();
-
- clusterBuilder.addTable().name("CUSTOMER")
- .addColumn("C_CUSTKEY", NativeTypes.INT64)
- .addColumn("C_NAME", NativeTypes.stringOf(25))
- .addColumn("C_ADDRESS", NativeTypes.stringOf(40))
- .addColumn("C_NATIONKEY", NativeTypes.INT64)
- .addColumn("C_PHONE", NativeTypes.stringOf(15))
- .addColumn("C_ACCTBAL", NativeTypes.decimalOf(15, 2))
- .addColumn("C_MKTSEGMENT", NativeTypes.stringOf(10))
- .addColumn("C_COMMENT", NativeTypes.stringOf(117))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(scalingFactor * CUSTOMER_SIZE)
- .end();
-
- clusterBuilder.addTable().name("ORDERS")
- .addColumn("O_ORDERKEY", NativeTypes.INT64)
- .addColumn("O_CUSTKEY", NativeTypes.INT64)
- .addColumn("O_ORDERSTATUS", NativeTypes.stringOf(1))
- .addColumn("O_TOTALPRICE", NativeTypes.decimalOf(15, 2))
- .addColumn("O_ORDERDATE", NativeTypes.datetime(6))
- .addColumn("O_ORDERPRIORITY", NativeTypes.stringOf(15))
- .addColumn("O_CLERK", NativeTypes.stringOf(15))
- .addColumn("O_SHIPPRIORITY", NativeTypes.INT32)
- .addColumn("O_COMMENT", NativeTypes.stringOf(79))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(scalingFactor * ORDERS_SIZE)
- .end();
-
- clusterBuilder.addTable().name("LINEITEM")
- .addColumn("L_ORDERKEY", NativeTypes.INT64)
- .addColumn("L_PARTKEY", NativeTypes.INT64)
- .addColumn("L_SUPPKEY", NativeTypes.INT64)
- .addColumn("L_LINENUMBER", NativeTypes.INT32)
- .addColumn("L_QUANTITY", NativeTypes.decimalOf(15, 2))
- .addColumn("L_EXTENDEDPRICE", NativeTypes.decimalOf(15, 2))
- .addColumn("L_DISCOUNT", NativeTypes.decimalOf(15, 2))
- .addColumn("L_TAX", NativeTypes.decimalOf(15, 2))
- .addColumn("L_RETURNFLAG", NativeTypes.stringOf(1))
- .addColumn("L_LINESTATUS", NativeTypes.stringOf(1))
- .addColumn("L_SHIPDATE", NativeTypes.datetime(6))
- .addColumn("L_COMMITDATE", NativeTypes.datetime(6))
- .addColumn("L_RECEIPTDATE", NativeTypes.datetime(6))
- .addColumn("L_SHIPINSTRUCT", NativeTypes.stringOf(25))
- .addColumn("L_SHIPMODE", NativeTypes.stringOf(10))
- .addColumn("L_COMMENT", NativeTypes.stringOf(44))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(scalingFactor * LINEITEM_SIZE)
- .end();
-
- clusterBuilder.addTable().name("NATION")
- .addColumn("N_NATIONKEY", NativeTypes.INT64)
- .addColumn("N_NAME", NativeTypes.stringOf(25))
- .addColumn("N_REGIONKEY", NativeTypes.INT64)
- .addColumn("N_COMMENT", NativeTypes.stringOf(152))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(NATION_SIZE)
- .end();
-
- clusterBuilder.addTable().name("REGION")
- .addColumn("R_REGIONKEY", NativeTypes.INT64)
- .addColumn("R_NAME", NativeTypes.stringOf(25))
- .addColumn("N_COMMENT", NativeTypes.stringOf(152))
- .distribution(IgniteDistributions.hash(List.of(0)))
- .size(REGION_SIZE)
- .end();
-
- }
-}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 33652960f6..32dec0855e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -17,45 +17,88 @@
package org.apache.ignite.internal.sql.engine.framework;
-import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.PLANNING_TIMEOUT;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
+import org.apache.ignite.internal.catalog.commands.CreateSortedIndexCommand;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
+import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
+import org.apache.ignite.internal.type.BitmaskNativeType;
+import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+import org.apache.ignite.internal.type.NumberNativeType;
+import org.apache.ignite.internal.type.TemporalNativeType;
+import org.apache.ignite.internal.type.VarlenNativeType;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.internal.util.subscription.TransformingPublisher;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
@@ -64,10 +107,6 @@ import org.jetbrains.annotations.Nullable;
* A collection of builders to create test objects.
*/
public class TestBuilders {
-
- /** Schema version. */
- public static final int SCHEMA_VERSION = -1;
-
/** Returns a builder of the test cluster object. */
public static ClusterBuilder cluster() {
return new ClusterBuilderImpl();
@@ -88,6 +127,112 @@ public class TestBuilders {
return new ClusterServiceFactory(nodes);
}
+ /**
+ * Factory method to create {@link ScannableTable table} instance from
given data provider with
+ * only implemented {@link ScannableTable#scan table scan}.
+ */
+ public static ScannableTable tableScan(DataProvider<Object[]>
dataProvider) {
+ return new ScannableTable() {
+ @Override
+ public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns) {
+
+ return new TransformingPublisher<>(
+ SubscriptionUtils.fromIterable(
+ () -> new TransformingIterator<>(
+ dataProvider.iterator(),
+ row -> project(row, requiredColumns)
+ )
+ ),
+ rowFactory::create
+ );
+ }
+
+ @Override
+ public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
+ @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Factory method to create {@link ScannableTable table} instance from
given data provider with
+ * only implemented {@link ScannableTable#indexRangeScan index range scan}.
+ */
+ public static ScannableTable indexRangeScan(DataProvider<Object[]>
dataProvider) {
+ return new ScannableTable() {
+ @Override
+ public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
+ @Nullable BitSet requiredColumns) {
+ return new TransformingPublisher<>(
+ SubscriptionUtils.fromIterable(
+ () -> new TransformingIterator<>(
+ dataProvider.iterator(),
+ row -> project(row, requiredColumns)
+ )
+ ),
+ rowFactory::create
+ );
+ }
+
+ @Override
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Factory method to create {@link ScannableTable table} instance from
given data provider with
+ * only implemented {@link ScannableTable#indexLookup index lookup}.
+ */
+ public static ScannableTable indexLookup(DataProvider<Object[]>
dataProvider) {
+ return new ScannableTable() {
+ @Override
+ public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
+ @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
+ return new TransformingPublisher<>(
+ SubscriptionUtils.fromIterable(
+ () -> new TransformingIterator<>(
+ dataProvider.iterator(),
+ row -> project(row, requiredColumns)
+ )
+ ),
+ rowFactory::create
+ );
+ }
+ };
+ }
+
/**
* A builder to create a test cluster object.
*
@@ -112,22 +257,21 @@ public class TestBuilders {
ClusterTableBuilder addTable();
/**
- * When specified the given factory is used to create instances of
- * {@link ClusterTableBuilder#defaultDataProvider(DataProvider)
default data providers} for tables that have no
- * {@link ClusterTableBuilder#defaultDataProvider(DataProvider)
default data provider} set.
- *
- * <p>Note: when a table has default data provider this method has no
effect.
+ * Builds the cluster object.
*
- * @return {@code this} for chaining.
+ * @return Created cluster object.
*/
- ClusterBuilder defaultDataProviderFactory(DataProviderFactory
dataProviderFactory);
+ TestCluster build();
/**
- * Builds the cluster object.
+ * Provides implementation of table with given name local per given
node.
*
- * @return Created cluster object.
+ * @param nodeName Name of the node given instance of table will be
assigned to.
+ * @param tableName Name of the table given instance represents.
+ * @param table Actual table that will be used for read operations
during execution.
+ * @return {@code this} for chaining.
*/
- TestCluster build();
+ ClusterBuilder dataProvider(String nodeName, String tableName,
ScannableTable table);
}
/**
@@ -137,17 +281,23 @@ public class TestBuilders {
*/
public interface TableBuilder extends TableBuilderBase<TableBuilder> {
/** Returns a builder of the test sorted-index object. */
- public SortedIndexBuilder sortedIndex();
+ SortedIndexBuilder sortedIndex();
/** Returns a builder of the test hash-index object. */
- public HashIndexBuilder hashIndex();
+ HashIndexBuilder hashIndex();
+
+ /** Sets the distribution of the table. */
+ TableBuilder distribution(IgniteDistribution distribution);
+
+ /** Sets the size of the table. */
+ TableBuilder size(int size);
/**
* Builds a table.
*
* @return Created table object.
*/
- public TestTable build();
+ TestTable build();
}
/**
@@ -173,7 +323,6 @@ public class TestBuilders {
* @see TestCluster
*/
public interface ClusterTableBuilder extends
TableBuilderBase<ClusterTableBuilder>,
- DataSourceBuilder<ClusterTableBuilder>,
NestedBuilder<ClusterBuilder> {
/**
@@ -198,7 +347,6 @@ public class TestBuilders {
* @see TestCluster
*/
public interface ClusterSortedIndexBuilder extends
SortedIndexBuilderBase<ClusterSortedIndexBuilder>,
- DataSourceBuilder<ClusterSortedIndexBuilder>,
NestedBuilder<ClusterTableBuilder> {
}
@@ -209,25 +357,9 @@ public class TestBuilders {
* @see TestCluster
*/
public interface ClusterHashIndexBuilder extends
HashIndexBuilderBase<ClusterHashIndexBuilder>,
- DataSourceBuilder<ClusterHashIndexBuilder>,
NestedBuilder<ClusterTableBuilder> {
}
- /**
- * A builder interface to enrich a builder object with data-source related
fields.
- */
- public interface DataSourceBuilder<ChildT> {
- /**
- * Adds a default data provider, which will be used for those nodes
for which no specific provider is specified.
- *
- * <p>Note: this method will force all nodes in the cluster to have a
data provider for the given object.
- */
- ChildT defaultDataProvider(DataProvider<?> dataProvider);
-
- /** Adds a data provider for the given node to the data source object.
*/
- ChildT addDataProvider(String targetNode, DataProvider<?>
dataProvider);
- }
-
/**
* A builder to create an execution context.
*
@@ -312,8 +444,8 @@ public class TestBuilders {
private static class ClusterBuilderImpl implements ClusterBuilder {
private final List<ClusterTableBuilderImpl> tableBuilders = new
ArrayList<>();
- private DataProviderFactory dataProviderFactory;
private List<String> nodeNames;
+ private final Map<String, Map<String, ScannableTable>>
nodeName2tableName2table = new HashMap<>();
/** {@inheritDoc} */
@Override
@@ -332,48 +464,57 @@ public class TestBuilders {
return new ClusterTableBuilderImpl(this);
}
- /** {@inheritDoc} */
@Override
- public ClusterBuilder defaultDataProviderFactory(DataProviderFactory
dataProviderFactory) {
- this.dataProviderFactory = dataProviderFactory;
+ public ClusterBuilder dataProvider(String nodeName, String tableName,
ScannableTable table) {
+ nodeName2tableName2table.computeIfAbsent(nodeName, key -> new
HashMap<>()).put(tableName, table);
+
return this;
}
/** {@inheritDoc} */
@Override
public TestCluster build() {
+ validateConfiguredDataProviders();
+
var clusterService = new ClusterServiceFactory(nodeNames);
- Map<String, Map<String, DataProvider<?>>> dataProvidersByTableName
= new HashMap<>();
- for (ClusterTableBuilderImpl tableBuilder : tableBuilders) {
- validateDataSourceBuilder(tableBuilder);
- injectDefaultDataProvidersIfNeeded(tableBuilder);
- injectDataProvidersIfNeeded(tableBuilder);
+ var clusterName = "test_cluster";
- for (AbstractIndexBuilderImpl<?> indexBuilder :
tableBuilder.indexBuilders) {
- validateDataSourceBuilder(indexBuilder);
- injectDataProvidersIfNeeded(indexBuilder);
- }
+ CatalogManager catalogManager =
CatalogTestUtils.createCatalogManagerWithTestUpdateLog(clusterName, new
HybridClockImpl());
+
+ var parserService = new ParserServiceImpl(0,
EmptyCacheFactory.INSTANCE);
+ var prepareService = new PrepareServiceImpl(clusterName, 0,
CaffeineCacheFactory.INSTANCE,
+ new DdlSqlToCommandConverter(Map.of(), () -> "aipersist"),
PLANNING_TIMEOUT, mock(MetricManager.class));
- dataProvidersByTableName.put(tableBuilder.name,
tableBuilder.dataProviders);
+ Map<String, List<String>> owningNodesByTableName = new HashMap<>();
+ for (Entry<String, Map<String, ScannableTable>> entry :
nodeName2tableName2table.entrySet()) {
+ for (String tableName : entry.getValue().keySet()) {
+ owningNodesByTableName.computeIfAbsent(tableName, key ->
new ArrayList<>()).add(entry.getKey());
+ }
}
- Map<String, IgniteTable> tableByName = tableBuilders.stream()
- .map(ClusterTableBuilderImpl::build)
- .collect(Collectors.toMap(TestTable::name,
Function.identity()));
+ List<CatalogCommand> initialSchema = tableBuilders.stream()
+ .flatMap(builder -> builder.build().stream())
+ .collect(Collectors.toList());
+
+ Runnable initClosure = () ->
await(catalogManager.execute(initialSchema));
- IgniteSchema schema = new IgniteSchema(DEFAULT_SCHEMA_NAME,
SCHEMA_VERSION, tableByName.values());
- var schemaManager = new PredefinedSchemaManager(schema);
+ var ddlHandler = new DdlCommandHandler(catalogManager);
+ var schemaManager = new SqlSchemaManagerImpl(catalogManager,
CaffeineCacheFactory.INSTANCE, 0);
var targetProvider = new ExecutionTargetProvider() {
@Override
public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
- Map<String, DataProvider<?>> dataProviders =
dataProvidersByTableName.get(table.name());
+ List<String> owningNodes =
owningNodesByTableName.get(table.name());
- if (nullOrEmpty(dataProviders)) {
+ if (nullOrEmpty(owningNodes)) {
throw new AssertionError("DataProvider is not
configured for table " + table.name());
}
- return
CompletableFuture.completedFuture(factory.allOf(List.copyOf(dataProviders.keySet())));
+ ExecutionTarget target =
factory.partitioned(owningNodes.stream()
+ .map(name -> new NodeWithTerm(name, 1))
+ .collect(Collectors.toList()));
+
+ return CompletableFuture.completedFuture(target);
}
@Override
@@ -393,47 +534,65 @@ public class TestBuilders {
mappingService.onTopologyLeap(new
LogicalTopologySnapshot(1L, logicalNodes));
return new TestNode(
- name, clusterService.forNode(name),
schemaManager, mappingService
+ name,
+ clusterService.forNode(name),
+ parserService,
+ prepareService,
+ schemaManager,
+ mappingService,
+ new
TestExecutableTableRegistry(nodeName2tableName2table.get(name), schemaManager),
+ ddlHandler
);
})
.collect(Collectors.toMap(TestNode::name,
Function.identity()));
- return new TestCluster(nodes);
+ return new TestCluster(
+ nodes,
+ List.of(new
ComponentToLifecycleAwareAdaptor(catalogManager), prepareService),
+ initClosure
+ );
}
- private void
validateDataSourceBuilder(AbstractDataSourceBuilderImpl<?> tableBuilder) {
- Set<String> tableOwners = new
HashSet<>(tableBuilder.dataProviders.keySet());
+ private void validateConfiguredDataProviders() {
+ Set<String> dataProvidersOwners = new
HashSet<>(nodeName2tableName2table.keySet());
- tableOwners.removeAll(nodeNames);
+ dataProvidersOwners.removeAll(Set.copyOf(nodeNames));
- if (!tableOwners.isEmpty()) {
- throw new AssertionError(format("The table has a dataProvider
that is outside the cluster "
- + "[tableName={}, outsiders={}]", tableBuilder.name,
tableOwners));
- }
- }
+ if (!dataProvidersOwners.isEmpty()) {
+ Map<String, List<String>> problematicTables = new HashMap<>();
+
+ for (String outsiderNode : dataProvidersOwners) {
+ for (String problematicTable :
nodeName2tableName2table.get(outsiderNode).keySet()) {
+ problematicTables.computeIfAbsent(problematicTable, k
-> new ArrayList<>()).add(outsiderNode);
+ }
+ }
+
+ String problematicTablesString =
problematicTables.entrySet().stream()
+ .map(e -> e.getKey() + ": " + e.getValue())
+ .collect(Collectors.joining(", "));
- private void
injectDefaultDataProvidersIfNeeded(ClusterTableBuilderImpl tableBuilder) {
- if (tableBuilder.defaultDataProvider == null &&
dataProviderFactory != null) {
- tableBuilder.defaultDataProvider =
dataProviderFactory.createDataProvider(tableBuilder.name, tableBuilder.columns);
+ throw new AssertionError(format("The table has a dataProvider
that is outside the cluster "
+ + "[{}]", problematicTablesString));
}
}
+ }
- private void
injectDataProvidersIfNeeded(AbstractDataSourceBuilderImpl<?> builder) {
- if (builder.defaultDataProvider == null) {
- return;
- }
+ private static class TableBuilderImpl implements TableBuilder {
+ private final List<AbstractTableIndexBuilderImpl<?>> indexBuilders =
new ArrayList<>();
+ private final List<ColumnDescriptor> columns = new ArrayList<>();
- Set<String> nodesWithoutDataProvider = new HashSet<>(nodeNames);
+ private String name;
+ private IgniteDistribution distribution;
+ private int size = 100_000;
- nodesWithoutDataProvider.removeAll(builder.dataProviders.keySet());
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder name(String name) {
+ this.name = name;
- for (String name : nodesWithoutDataProvider) {
- builder.addDataProvider(name, builder.defaultDataProvider);
- }
+ return this;
}
- }
- private static class TableBuilderImpl extends
AbstractTableBuilderImpl<TableBuilder> implements TableBuilder {
/** {@inheritDoc} */
@Override
public SortedIndexBuilder sortedIndex() {
@@ -446,6 +605,63 @@ public class TestBuilders {
return new HashIndexBuilderImpl(this);
}
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder distribution(IgniteDistribution distribution) {
+ this.distribution = distribution;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder addColumn(String name, NativeType type, boolean
nullable) {
+ columns.add(new ColumnDescriptorImpl(
+ name, false, nullable, columns.size(), type,
DefaultValueStrategy.DEFAULT_NULL, null
+ ));
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder addColumn(String name, NativeType type) {
+ return addColumn(name, type, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder addColumn(String name, NativeType type, @Nullable
Object defaultValue) {
+ if (defaultValue == null) {
+ return addColumn(name, type);
+ } else {
+ ColumnDescriptorImpl desc = new ColumnDescriptorImpl(
+ name, false, true, columns.size(), type,
DefaultValueStrategy.DEFAULT_CONSTANT, () -> defaultValue
+ );
+ columns.add(desc);
+ }
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder addKeyColumn(String name, NativeType type) {
+ columns.add(new ColumnDescriptorImpl(
+ name, true, false, columns.size(), type,
DefaultValueStrategy.DEFAULT_NULL, null
+ ));
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder size(int size) {
+ this.size = size;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override
public TestTable build() {
@@ -474,21 +690,30 @@ public class TestBuilders {
indexes
);
}
-
- /** {@inheritDoc} */
- @Override
- protected TableBuilder self() {
- return this;
- }
}
- private static class ClusterTableBuilderImpl extends
AbstractTableBuilderImpl<ClusterTableBuilder> implements ClusterTableBuilder {
+ private static class ClusterTableBuilderImpl implements
ClusterTableBuilder {
+ private final List<AbstractClusterTableIndexBuilderImpl<?>>
indexBuilders = new ArrayList<>();
+
+ private final List<ColumnParams> columns = new ArrayList<>();
+ private final List<String> keyColumns = new ArrayList<>();
+
private final ClusterBuilderImpl parent;
+ private String name;
+
private ClusterTableBuilderImpl(ClusterBuilderImpl parent) {
this.parent = parent;
}
+ /** {@inheritDoc} */
+ @Override
+ public ClusterTableBuilder name(String name) {
+ this.name = name;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override
public ClusterSortedIndexBuilder addSortedIndex() {
@@ -501,12 +726,33 @@ public class TestBuilders {
return new ClusterHashIndexBuilderImpl(this);
}
+ @Override
+ public ClusterTableBuilder addColumn(String name, NativeType type,
boolean nullable) {
+ columns.add(columnParams(name, type, nullable, null));
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override
- protected ClusterTableBuilder self() {
+ public ClusterTableBuilder addColumn(String name, NativeType type) {
+ return addColumn(name, type, true);
+ }
+
+ @Override
+ public ClusterTableBuilder addColumn(String name, NativeType type,
@Nullable Object defaultValue) {
+ columns.add(columnParams(name, type, true, defaultValue));
+
return this;
}
+ @Override
+ public ClusterTableBuilder addKeyColumn(String name, NativeType type) {
+ keyColumns.add(name);
+
+ return addColumn(name, type, false);
+ }
+
/** {@inheritDoc} */
@Override
public ClusterBuilder end() {
@@ -515,18 +761,27 @@ public class TestBuilders {
return parent;
}
- private TestTable build() {
- TableDescriptorImpl tableDescriptor = new
TableDescriptorImpl(columns, distribution);
+ private List<CatalogCommand> build() {
+ List<CatalogCommand> commands = new ArrayList<>(1 +
indexBuilders.size());
- List<IgniteIndex> indexes = indexBuilders.stream()
- .map(idx -> idx.build(tableDescriptor))
- .collect(Collectors.toList());
+ commands.add(
+ CreateTableCommand.builder()
+ .schemaName("PUBLIC")
+ .tableName(name)
+ .columns(columns)
+ .primaryKeyColumns(keyColumns)
+ .build()
+ );
+
+ for (AbstractClusterTableIndexBuilderImpl<?> builder :
indexBuilders) {
+ commands.add(builder.build("PUBLIC", name));
+ }
- return new TestTable(tableDescriptor, name, size, indexes,
dataProviders);
+ return commands;
}
}
- private static class SortedIndexBuilderImpl extends
AbstractIndexBuilderImpl<SortedIndexBuilder>
+ private static class SortedIndexBuilderImpl extends
AbstractTableIndexBuilderImpl<SortedIndexBuilder>
implements SortedIndexBuilder {
private final TableBuilderImpl parent;
@@ -563,11 +818,11 @@ public class TestBuilders {
throw new IllegalArgumentException("Collation must be
specified for each of columns.");
}
- return TestIndex.createSorted(name, columns, collations, desc,
dataProviders);
+ return TestIndex.createSorted(name, columns, collations, desc);
}
}
- private static class HashIndexBuilderImpl extends
AbstractIndexBuilderImpl<HashIndexBuilder> implements HashIndexBuilder {
+ private static class HashIndexBuilderImpl extends
AbstractTableIndexBuilderImpl<HashIndexBuilder> implements HashIndexBuilder {
private final TableBuilderImpl parent;
private HashIndexBuilderImpl(TableBuilderImpl parent) {
@@ -601,11 +856,11 @@ public class TestBuilders {
assert collations == null : "Collation is not supported.";
- return TestIndex.createHash(name, columns, desc, dataProviders);
+ return TestIndex.createHash(name, columns, desc);
}
}
- private static class ClusterSortedIndexBuilderImpl extends
AbstractIndexBuilderImpl<ClusterSortedIndexBuilder>
+ private static class ClusterSortedIndexBuilderImpl extends
AbstractClusterTableIndexBuilderImpl<ClusterSortedIndexBuilder>
implements ClusterSortedIndexBuilder {
private final ClusterTableBuilderImpl parent;
@@ -628,14 +883,24 @@ public class TestBuilders {
}
@Override
- TestIndex build(TableDescriptor desc) {
+ CatalogCommand build(String schemaName, String tableName) {
assert collations.size() == columns.size();
- return TestIndex.createSorted(name, columns, collations, desc,
dataProviders);
+ List<CatalogColumnCollation> catalogCollations =
collations.stream()
+ .map(c -> CatalogColumnCollation.get(c.asc, c.nullsFirst))
+ .collect(Collectors.toList());
+
+ return CreateSortedIndexCommand.builder()
+ .schemaName(schemaName)
+ .tableName(tableName)
+ .indexName(name)
+ .columns(columns)
+ .collations(catalogCollations)
+ .build();
}
}
- private static class ClusterHashIndexBuilderImpl extends
AbstractIndexBuilderImpl<ClusterHashIndexBuilder>
+ private static class ClusterHashIndexBuilderImpl extends
AbstractClusterTableIndexBuilderImpl<ClusterHashIndexBuilder>
implements ClusterHashIndexBuilder {
private final ClusterTableBuilderImpl parent;
@@ -658,36 +923,20 @@ public class TestBuilders {
}
@Override
- TestIndex build(TableDescriptor desc) {
- assert collations == null;
-
- return TestIndex.createHash(name, columns, desc, dataProviders);
+ CatalogCommand build(String schemaName, String tableName) {
+ return CreateHashIndexCommand.builder()
+ .schemaName(schemaName)
+ .tableName(tableName)
+ .indexName(name)
+ .columns(columns)
+ .build();
}
}
- /**
- * A factory that creates {@link DataProvider data providers}.
- */
- @FunctionalInterface
- public interface DataProviderFactory {
-
- /**
- * Creates a {@link DataProvider} for the given table.
- *
- * @param tableName a table name.
- * @param columns a list of columns.
- * @return an instance of {@link DataProvider}.
- */
- DataProvider<Object[]> createDataProvider(String tableName,
List<ColumnDescriptor> columns);
- }
-
- private abstract static class AbstractTableBuilderImpl<ChildT> extends
AbstractDataSourceBuilderImpl<ChildT>
- implements TableBuilderBase<ChildT> {
- protected final List<ColumnDescriptor> columns = new ArrayList<>();
- protected final List<AbstractIndexBuilderImpl> indexBuilders = new
ArrayList<>();
-
- protected IgniteDistribution distribution;
- protected int size = 100_000;
+ private abstract static class AbstractIndexBuilderImpl<ChildT> implements
SortedIndexBuilderBase<ChildT>, HashIndexBuilderBase<ChildT> {
+ String name;
+ final List<String> columns = new ArrayList<>();
+ List<Collation> collations;
/** {@inheritDoc} */
@Override
@@ -697,68 +946,6 @@ public class TestBuilders {
return self();
}
- /** {@inheritDoc} */
- @Override
- public ChildT distribution(IgniteDistribution distribution) {
- this.distribution = distribution;
-
- return self();
- }
-
- /** {@inheritDoc} */
- @Override
- public ChildT addKeyColumn(String name, NativeType type) {
- columns.add(new ColumnDescriptorImpl(
- name, true, false, columns.size(), type,
DefaultValueStrategy.DEFAULT_NULL, null
- ));
-
- return self();
- }
-
- /** {@inheritDoc} */
- @Override
- public ChildT addColumn(String name, NativeType type) {
- return addColumn(name, type, true);
- }
-
- /** {@inheritDoc} */
- @Override
- public ChildT addColumn(String name, NativeType type, boolean
nullable) {
- columns.add(new ColumnDescriptorImpl(
- name, false, nullable, columns.size(), type,
DefaultValueStrategy.DEFAULT_NULL, null
- ));
-
- return self();
- }
-
- /** {@inheritDoc} */
- @Override
- public ChildT addColumn(String name, NativeType type, @Nullable Object
defaultValue) {
- if (defaultValue == null) {
- return addColumn(name, type);
- } else {
- ColumnDescriptorImpl desc = new ColumnDescriptorImpl(
- name, false, true, columns.size(), type,
DefaultValueStrategy.DEFAULT_CONSTANT, () -> defaultValue
- );
- columns.add(desc);
- }
- return self();
- }
-
- /** {@inheritDoc} */
- @Override
- public ChildT size(int size) {
- this.size = size;
-
- return self();
- }
- }
-
- private abstract static class AbstractIndexBuilderImpl<ChildT> extends
AbstractDataSourceBuilderImpl<ChildT>
- implements SortedIndexBuilderBase<ChildT>,
HashIndexBuilderBase<ChildT> {
- protected final List<String> columns = new ArrayList<>();
- protected List<Collation> collations;
-
/** {@inheritDoc} */
@Override
public ChildT addColumn(String columnName) {
@@ -780,34 +967,15 @@ public class TestBuilders {
return self();
}
- abstract TestIndex build(TableDescriptor desc);
- }
-
- private abstract static class AbstractDataSourceBuilderImpl<ChildT> {
-
- protected String name;
- final Map<String, DataProvider<?>> dataProviders = new HashMap<>();
- DataProvider<?> defaultDataProvider = null;
-
abstract ChildT self();
+ }
- public ChildT name(String name) {
- this.name = name;
-
- return self();
- }
-
- public ChildT defaultDataProvider(DataProvider<?> dataProvider) {
- this.defaultDataProvider = dataProvider;
-
- return self();
- }
-
- public ChildT addDataProvider(String targetNode, DataProvider<?>
dataProvider) {
- this.dataProviders.put(targetNode, dataProvider);
+ private abstract static class AbstractTableIndexBuilderImpl<ChildT>
extends AbstractIndexBuilderImpl<ChildT> {
+ abstract TestIndex build(TableDescriptor desc);
+ }
- return self();
- }
+ private abstract static class AbstractClusterTableIndexBuilderImpl<ChildT>
extends AbstractIndexBuilderImpl<ChildT> {
+ abstract CatalogCommand build(String schemaName, String tableName);
}
/**
@@ -823,9 +991,6 @@ public class TestBuilders {
/** Sets the name of the table. */
ChildT name(String name);
- /** Sets the distribution of the table. */
- ChildT distribution(IgniteDistribution distribution);
-
/** Adds a key column to the table. */
ChildT addKeyColumn(String name, NativeType type);
@@ -837,9 +1002,6 @@ public class TestBuilders {
/** Adds a column with the given default value to the table. */
ChildT addColumn(String name, NativeType type, @Nullable Object
defaultValue);
-
- /** Sets the size of the table. */
- ChildT size(int size);
}
/**
@@ -911,4 +1073,131 @@ public class TestBuilders {
*/
ParentT end();
}
+
+ private static class ComponentToLifecycleAwareAdaptor implements
LifecycleAware {
+ private final IgniteComponent component;
+
+ ComponentToLifecycleAwareAdaptor(IgniteComponent component) {
+ this.component = component;
+ }
+
+ @Override
+ public void start() {
+ component.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ component.stop();
+ }
+ }
+
+ private static class TestExecutableTableRegistry implements
ExecutableTableRegistry {
+ private final Map<String, ScannableTable> tablesByName;
+ private final SqlSchemaManager schemaManager;
+
+ TestExecutableTableRegistry(Map<String, ScannableTable> tablesByName,
SqlSchemaManager schemaManager) {
+ this.tablesByName = tablesByName;
+ this.schemaManager = schemaManager;
+ }
+
+ @Override
+ public CompletableFuture<ExecutableTable> getTable(int schemaVersion,
int tableId) {
+ IgniteTable table = schemaManager.table(schemaVersion, tableId);
+
+ assert table != null;
+
+ return CompletableFuture.completedFuture(new ExecutableTable() {
+ @Override
+ public ScannableTable scannableTable() {
+ ScannableTable scannableTable =
tablesByName.get(table.name());
+
+ assert scannableTable != null;
+
+ return scannableTable;
+ }
+
+ @Override
+ public UpdatableTable updatableTable() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TableDescriptor tableDescriptor() {
+ return table.descriptor();
+ }
+ });
+ }
+ }
+
+ private static ColumnParams columnParams(String name, NativeType type,
boolean nullable, @Nullable Object defaultValue) {
+ NativeTypeSpec typeSpec = type.spec();
+
+ Builder builder = ColumnParams.builder()
+ .name(name)
+ .type(typeSpec.asColumnType())
+ .nullable(nullable)
+ .defaultValue(DefaultValue.constant(defaultValue));
+
+ switch (typeSpec) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case UUID:
+ case BOOLEAN:
+ break;
+ case NUMBER:
+ assert type instanceof NumberNativeType :
type.getClass().getCanonicalName();
+
+ builder.precision(((NumberNativeType) type).precision());
+ break;
+ case DECIMAL:
+ assert type instanceof DecimalNativeType :
type.getClass().getCanonicalName();
+
+ builder.precision(((DecimalNativeType) type).precision());
+ builder.scale(((DecimalNativeType) type).scale());
+ break;
+ case STRING:
+ case BYTES:
+ assert type instanceof VarlenNativeType :
type.getClass().getCanonicalName();
+
+ builder.length(((VarlenNativeType) type).length());
+ break;
+ case BITMASK:
+ assert type instanceof BitmaskNativeType :
type.getClass().getCanonicalName();
+
+ builder.length(((BitmaskNativeType) type).bits());
+ break;
+ case TIME:
+ case DATETIME:
+ case TIMESTAMP:
+ assert type instanceof TemporalNativeType :
type.getClass().getCanonicalName();
+
+ builder.precision(((TemporalNativeType) type).precision());
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported native type: "
+ typeSpec);
+ }
+
+ return builder.build();
+ }
+
+ private static Object[] project(Object[] row, @Nullable BitSet
requiredElements) {
+ if (requiredElements == null) {
+ return row;
+ }
+
+ Object[] newRow = new Object[requiredElements.cardinality()];
+
+ int idx = 0;
+ for (int i = requiredElements.nextSetBit(0); i != -1; i =
requiredElements.nextSetBit(i + 1)) {
+ newRow[idx++] = row[i];
+ }
+
+ return newRow;
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
index 9a660217be..33f2e20041 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.sql.engine.framework;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -33,9 +35,17 @@ import org.apache.ignite.internal.util.IgniteUtils;
*/
public class TestCluster implements LifecycleAware {
private final Map<String, TestNode> nodeByName;
+ private final List<LifecycleAware> components;
+ private final Runnable initClosure;
- TestCluster(Map<String, TestNode> nodeByName) {
+ TestCluster(
+ Map<String, TestNode> nodeByName,
+ List<LifecycleAware> components,
+ Runnable initClosure
+ ) {
this.nodeByName = nodeByName;
+ this.components = components;
+ this.initClosure = initClosure;
}
/**
@@ -51,16 +61,25 @@ public class TestCluster implements LifecycleAware {
/** {@inheritDoc} */
@Override
public void start() {
+ components.forEach(LifecycleAware::start);
+
nodeByName.values().forEach(TestNode::start);
+
+
+ initClosure.run();
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
- List<AutoCloseable> closeables = nodeByName.values().stream()
+ List<AutoCloseable> closeables = Stream.concat(
+ components.stream(),
+ nodeByName.values().stream()
+ )
.map(node -> ((AutoCloseable) node::stop))
.collect(Collectors.toList());
+ Collections.reverse(closeables);
IgniteUtils.closeAll(closeables);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
index 0ca5440115..2d6d7c94c6 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
@@ -18,19 +18,30 @@
package org.apache.ignite.internal.sql.engine.framework;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.BitSet;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.apache.ignite.internal.util.subscription.TransformingPublisher;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -39,24 +50,65 @@ import org.junit.jupiter.api.Test;
*/
public class TestClusterTest extends BaseIgniteAbstractTest {
- private final DataProvider<Object[]> dataProvider = DataProvider.fromRow(
- new Object[]{42, UUID.randomUUID().toString()}, 3_333
- );
+ private final ScannableTable table = new ScannableTable() {
+ @Override
+ public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns) {
+
+ return new TransformingPublisher<>(
+ SubscriptionUtils.fromIterable(
+ DataProvider.fromRow(
+ new Object[]{42,
UUID.randomUUID().toString()}, 3_333
+ )
+ ), rowFactory::create
+ );
+ }
+
+ @Override
+ public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
+ @Nullable BitSet requiredColumns) {
+
+ return new TransformingPublisher<>(
+ SubscriptionUtils.fromIterable(
+ DataProvider.fromRow(
+ new Object[]{42,
UUID.randomUUID().toString()}, 10
+ )
+ ), rowFactory::create
+ );
+ }
+
+ @Override
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
+
+ return new TransformingPublisher<>(
+ SubscriptionUtils.fromIterable(
+ DataProvider.fromRow(
+ new Object[]{42,
UUID.randomUUID().toString()}, 1
+ )
+ ), rowFactory::create
+ );
+ }
+ };
// @formatter:off
- private final TestCluster cluster = TestBuilders.cluster().nodes("N1")
+ private final TestCluster cluster = TestBuilders.cluster()
+ .nodes("N1", "N2")
.addTable()
- .name("T1")
- .distribution(IgniteDistributions.hash(List.of(0)))
- .addColumn("ID", NativeTypes.INT32)
- .addColumn("VAL", NativeTypes.stringOf(64))
- .defaultDataProvider(dataProvider)
- .addHashIndex()
- .name("IDX_ID")
- .addColumn("ID")
- .defaultDataProvider(dataProvider)
- .end()
- .end()
+ .name("T1")
+ .addKeyColumn("ID", NativeTypes.INT32)
+ .addColumn("VAL", NativeTypes.stringOf(64))
+ .addSortedIndex()
+ .name("SORTED_IDX")
+ .addColumn("ID", Collation.ASC_NULLS_FIRST)
+ .end()
+ .end()
+ .dataProvider("N1", "T1", table)
+ .dataProvider("N2", "T1", table)
+ // table T2 will be created later by DDL
+ .dataProvider("N1", "T2", table)
+ .dataProvider("N2", "T2", table)
.build();
// @formatter:on
@@ -85,11 +137,30 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
assertTrue(fragment.root().getInput(0) instanceof IgniteTableScan);
}
- /**
- * Runs a SELECT query with condition.
- */
@Test
- public void testQueryWithCondition() {
+ public void testSimpleFromCreatedTableByDdl() {
+ cluster.start();
+
+ var gatewayNode = cluster.node("N1");
+
+ gatewayNode.initSchema(
+ "CREATE TABLE t2 (id INT PRIMARY KEY, val VARCHAR(64))"
+ );
+
+ QueryPlan plan = gatewayNode.prepare("SELECT * FROM t2");
+
+ for (var row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+ assertNotNull(row);
+ }
+
+ // Ensure the plan contains full table scan.
+ assertTrue(plan instanceof MultiStepPlan);
+ Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
+ assertTrue(fragment.root().getInput(0) instanceof IgniteTableScan);
+ }
+
+ @Test
+ public void testSelectByKey() {
cluster.start();
TestNode gatewayNode = cluster.node("N1");
@@ -103,5 +174,24 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
assertTrue(plan instanceof MultiStepPlan);
Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
assertTrue(fragment.root().getInput(0) instanceof IgniteIndexScan);
+ assertEquals("T1_PK", ((IgniteIndexScan)
fragment.root().getInput(0)).indexName());
+ }
+
+ @Test
+ public void testSelectRange() {
+ cluster.start();
+
+ TestNode gatewayNode = cluster.node("N1");
+ QueryPlan plan = gatewayNode.prepare("SELECT * FROM t1 WHERE ID > 1");
+
+ for (List<?> row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+ assertNotNull(row);
+ }
+
+ // Ensure the plan uses index.
+ assertTrue(plan instanceof MultiStepPlan);
+ Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
+ assertTrue(fragment.root().getInput(0) instanceof IgniteIndexScan);
+ assertEquals("SORTED_IDX", ((IgniteIndexScan)
fragment.root().getInput(0)).indexName());
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
index 4f3e891ceb..188f612a9c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestIndex.java
@@ -17,10 +17,7 @@
package org.apache.ignite.internal.sql.engine.framework;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.calcite.rel.RelCollation;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
@@ -29,23 +26,18 @@ import
org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
/**
- * A test index that implements all the necessary for the optimizer methods to
be used to prepare a query, as well as provides access to the
- * data to use this index in execution-related scenarios.
+ * A test index that implements all the necessary for the optimizer methods to
be used to prepare a query.
*/
public class TestIndex extends IgniteIndex {
- private static final String DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE =
- "DataProvider is not configured [index={}, node={}]";
-
/** Factory method for creating hash-index. */
static TestIndex createHash(
String name,
List<String> columns,
- TableDescriptor tableDescriptor,
- Map<String, DataProvider<?>> dataProviders
+ TableDescriptor tableDescriptor
) {
RelCollation collation = TraitUtils.createCollation(columns, null,
tableDescriptor);
- return new TestIndex(name, Type.HASH, tableDescriptor.distribution(),
collation, dataProviders);
+ return new TestIndex(name, Type.HASH, tableDescriptor.distribution(),
collation);
}
/** Factory method for creating sorted-index. */
@@ -53,44 +45,22 @@ public class TestIndex extends IgniteIndex {
String name,
List<String> columns,
List<Collation> collations,
- TableDescriptor tableDescriptor,
- Map<String, DataProvider<?>> dataProviders
+ TableDescriptor tableDescriptor
) {
RelCollation collation = TraitUtils.createCollation(columns,
collations, tableDescriptor);
- return new TestIndex(name, Type.SORTED,
tableDescriptor.distribution(), collation, dataProviders);
+ return new TestIndex(name, Type.SORTED,
tableDescriptor.distribution(), collation);
}
private static final AtomicInteger ID = new AtomicInteger();
- private final Map<String, DataProvider<?>> dataProviders;
-
/** Constructor. */
TestIndex(
String name,
Type type,
IgniteDistribution distribution,
- RelCollation collation,
- Map<String, DataProvider<?>> dataProviders
+ RelCollation collation
) {
super(ID.incrementAndGet(), name, type, distribution, collation);
-
- this.dataProviders = dataProviders;
- }
-
- /**
- * Returns the data provider for the given node.
- *
- * @param nodeName Name of the node of interest.
- * @param <RowT> A type of the rows the data provider should produce.
- * @return A data provider for the node of interest.
- * @throws AssertionError in case data provider is not configured for the
given node.
- */
- <RowT> DataProvider<RowT> dataProvider(String nodeName) {
- if (!dataProviders.containsKey(nodeName)) {
- throw new
AssertionError(format(DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE, name(),
nodeName));
- }
-
- return (DataProvider<RowT>) dataProviders.get(nodeName);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 29359eaebc..fd7d59f7f4 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -18,56 +18,44 @@
package org.apache.ignite.internal.sql.engine.framework;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.PLANNING_TIMEOUT;
import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolver;
import
org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolverImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
-import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.NoOpExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
-import org.apache.ignite.internal.sql.engine.exec.rel.Node;
-import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
-import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
-import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
-import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.StringUtils;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.TopologyService;
@@ -96,13 +84,16 @@ public class TestNode implements LifecycleAware {
TestNode(
String nodeName,
ClusterService clusterService,
+ ParserService parserService,
+ PrepareService prepareService,
SqlSchemaManager schemaManager,
- MappingService mappingService
+ MappingService mappingService,
+ ExecutableTableRegistry tableRegistry,
+ DdlCommandHandler ddlCommandHandler
) {
this.nodeName = nodeName;
- var ps = new PrepareServiceImpl(nodeName, 0,
CaffeineCacheFactory.INSTANCE,
- mock(DdlSqlToCommandConverter.class), PLANNING_TIMEOUT,
mock(MetricManager.class));
- this.prepareService = registerService(ps);
+ this.parserService = parserService;
+ this.prepareService = prepareService;
this.schemaManager = schemaManager;
TopologyService topologyService = clusterService.topologyService();
@@ -113,52 +104,27 @@ public class TestNode implements LifecycleAware {
QueryTaskExecutor taskExecutor = registerService(new
QueryTaskExecutorImpl(nodeName));
MessageService messageService = registerService(new MessageServiceImpl(
- topologyService.localMember().name(), messagingService,
taskExecutor, new IgniteSpinBusyLock()
+ nodeName, messagingService, taskExecutor, new
IgniteSpinBusyLock()
));
ExchangeService exchangeService = registerService(new
ExchangeServiceImpl(
mailboxRegistry, messageService
));
- NoOpExecutableTableRegistry executableTableRegistry = new
NoOpExecutableTableRegistry();
ExecutionDependencyResolver dependencyResolver = new
ExecutionDependencyResolverImpl(
- executableTableRegistry, null
+ tableRegistry, null
);
- executionService = registerService(new ExecutionServiceImpl<>(
- messageService,
+ executionService = registerService(ExecutionServiceImpl.create(
topologyService,
- mappingService,
+ messageService,
schemaManager,
- mock(DdlCommandHandler.class),
+ ddlCommandHandler,
taskExecutor,
rowHandler,
- dependencyResolver,
- (ctx, deps) -> new LogicalRelImplementor<Object[]>(
- ctx,
- new HashFunctionFactoryImpl<>(rowHandler),
- mailboxRegistry,
- exchangeService,
- deps
- ) {
- @Override
- public Node<Object[]> visit(IgniteTableScan rel) {
- DataProvider<Object[]> dataProvider =
rel.getTable().unwrap(TestTable.class).dataProvider(ctx.localNode().name());
-
- return new ScanNode<>(ctx, dataProvider);
- }
-
- @Override
- public Node<Object[]> visit(IgniteIndexScan rel) {
- TestTable tbl = rel.getTable().unwrap(TestTable.class);
- TestIndex idx = (TestIndex)
tbl.indexes().get(rel.indexName());
-
- DataProvider<Object[]> dataProvider =
idx.dataProvider(ctx.localNode().name());
-
- return new ScanNode<>(ctx, dataProvider);
- }
- }
+ mailboxRegistry,
+ exchangeService,
+ mappingService,
+ dependencyResolver
));
-
- parserService = new ParserServiceImpl(0, EmptyCacheFactory.INSTANCE);
}
/** {@inheritDoc} */
@@ -221,6 +187,36 @@ public class TestNode implements LifecycleAware {
return await(prepareService.prepareAsync(parsedResult,
createContext()));
}
+ /**
+ * Executes the given script.
+ *
+ * <p>This method splits given string by semicolon and execute every
statement
+ * one by one. Technically it may execute SELECT statements as well, but
since
+ * it returns nothing, it doesn't make any sense.
+ *
+ * @param script Script to execute.
+ */
+ public void initSchema(String script) {
+ for (String statement : script.split(";")) {
+ if (StringUtils.nullOrBlank(statement) ||
statement.trim().startsWith("--")) {
+ continue;
+ }
+
+ ParsedResult parsedResult = parserService.parse(statement);
+ BaseQueryContext ctx = createContext();
+
+ QueryPlan plan = await(prepareService.prepareAsync(parsedResult,
ctx));
+
+ if (plan.type() != SqlQueryType.DDL && plan.type() !=
SqlQueryType.DML) {
+ continue;
+ }
+
+ AsyncCursor<?> cursor = executionService.executePlan(new
NoOpTransaction("tx"), plan, ctx);
+
+ await(cursor.requestNextAsync(1));
+ }
+ }
+
private BaseQueryContext createContext() {
return BaseQueryContext.builder()
.cancel(new QueryCancel())
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index eee7c3288c..651e74d55a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -38,7 +38,6 @@ import
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionInfo;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.AsyncCursor;
@@ -71,11 +70,10 @@ public class QueryCheckerTest extends
BaseIgniteAbstractTest {
.name("T1")
.addKeyColumn("ID", NativeTypes.INT32)
.addColumn("VAL", NativeTypes.INT32)
- .distribution(IgniteDistributions.hash(List.of(0)))
- .defaultDataProvider(DataProvider.fromCollection(List.of(
- new Object[] {1, 1}, new Object[] {2, 2}
- )))
.end()
+ .dataProvider(NODE_NAME, "T1",
TestBuilders.tableScan(DataProvider.fromCollection(List.of(
+ new Object[] {1, 1}, new Object[] {2, 2}
+ ))))
.build();
// @formatter:on
diff --git
a/modules/sql-engine/src/test/resources/tpch/schema_definition_ddl.sql
b/modules/sql-engine/src/test/resources/tpch/schema_definition_ddl.sql
new file mode 100644
index 0000000000..18f080cf3d
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/tpch/schema_definition_ddl.sql
@@ -0,0 +1,125 @@
+-- noinspection SqlDialectInspectionForFile
+-- noinspection SqlNoDataSourceInspectionForFile
+
+DROP TABLE IF EXISTS nation;
+DROP TABLE IF EXISTS region;
+DROP TABLE IF EXISTS part;
+DROP TABLE IF EXISTS supplier;
+DROP TABLE IF EXISTS partsupp;
+DROP TABLE IF EXISTS orders;
+DROP TABLE IF EXISTS customer;
+DROP TABLE IF EXISTS lineitem;
+
+CREATE TABLE region (
+ r_regionkey integer NOT NULL,
+ r_name char(25) NOT NULL,
+ r_comment varchar(152),
+ PRIMARY KEY (r_regionkey)
+);
+
+CREATE TABLE nation (
+ n_nationkey integer NOT NULL,
+ n_name char(25) NOT NULL,
+ n_regionkey integer NOT NULL,
+ n_comment varchar(152),
+ PRIMARY KEY (n_nationkey)
+);
+
+CREATE INDEX n_rk ON nation (n_regionkey ASC);
+
+CREATE TABLE part (
+ p_partkey integer NOT NULL,
+ p_name varchar(55) NOT NULL,
+ p_mfgr char(25) NOT NULL,
+ p_brand char(10) NOT NULL,
+ p_type varchar(25) NOT NULL,
+ p_size integer NOT NULL,
+ p_container char(10) NOT NULL,
+ p_retailprice decimal(15, 2) NOT NULL,
+ p_comment varchar(23) NOT NULL,
+ PRIMARY KEY (p_partkey)
+);
+
+
+CREATE TABLE supplier (
+ s_suppkey integer NOT NULL,
+ s_name char(25) NOT NULL,
+ s_address varchar(40) NOT NULL,
+ s_nationkey integer NOT NULL,
+ s_phone char(15) NOT NULL,
+ s_acctbal decimal(15, 2) NOT NULL,
+ s_comment varchar(101) NOT NULL,
+ PRIMARY KEY (s_suppkey)
+);
+
+CREATE INDEX s_nk ON supplier (s_nationkey ASC);
+
+CREATE TABLE partsupp (
+ ps_partkey integer NOT NULL,
+ ps_suppkey integer NOT NULL,
+ ps_availqty integer NOT NULL,
+ ps_supplycost decimal(15, 2) NOT NULL,
+ ps_comment varchar(199) NOT NULL,
+ PRIMARY KEY (ps_partkey, ps_suppkey)
+);
+
+CREATE INDEX ps_sk ON partsupp (ps_suppkey ASC);
+CREATE INDEX ps_sk_pk ON partsupp (ps_suppkey ASC, ps_partkey ASC);
+
+CREATE TABLE customer (
+ c_custkey integer NOT NULL,
+ c_name varchar(25) NOT NULL,
+ c_address varchar(40) NOT NULL,
+ c_nationkey integer NOT NULL,
+ c_phone char(15) NOT NULL,
+ c_acctbal decimal(15, 2) NOT NULL,
+ c_mktsegment char(10) NOT NULL,
+ c_comment varchar(117) NOT NULL,
+ PRIMARY KEY (c_custkey)
+);
+
+CREATE INDEX c_nk ON customer (c_nationkey ASC);
+
+CREATE TABLE orders (
+ o_orderkey integer NOT NULL,
+ o_custkey integer NOT NULL,
+ o_orderstatus char(1) NOT NULL,
+ o_totalprice decimal(15, 2) NOT NULL,
+ o_orderdate date NOT NULL,
+ o_orderpriority char(15) NOT NULL,
+ o_clerk char(15) NOT NULL,
+ o_shippriority integer NOT NULL,
+ o_comment varchar(79) NOT NULL,
+ PRIMARY KEY (o_orderkey)
+);
+
+CREATE INDEX o_ck ON orders (o_custkey ASC);
+CREATE INDEX o_od ON orders (o_orderdate ASC);
+
+CREATE TABLE lineitem (
+ l_orderkey integer NOT NULL,
+ l_partkey integer NOT NULL,
+ l_suppkey integer NOT NULL,
+ l_linenumber integer NOT NULL,
+ l_quantity decimal(15, 2) NOT NULL,
+ l_extendedprice decimal(15, 2) NOT NULL,
+ l_discount decimal(15, 2) NOT NULL,
+ l_tax decimal(15, 2) NOT NULL,
+ l_returnflag char(1) NOT NULL,
+ l_linestatus char(1) NOT NULL,
+ l_shipdate date NOT NULL,
+ l_commitdate date NOT NULL,
+ l_receiptdate date NOT NULL,
+ l_shipinstruct char(25) NOT NULL,
+ l_shipmode char(10) NOT NULL,
+ l_comment varchar(44) NOT NULL,
+ PRIMARY KEY (l_orderkey, l_linenumber)
+);
+
+CREATE INDEX l_pk ON lineitem (l_partkey ASC);
+CREATE INDEX l_sk ON lineitem (l_suppkey ASC);
+CREATE INDEX l_sd ON lineitem (l_shipdate ASC);
+CREATE INDEX l_cd ON lineitem (l_commitdate ASC);
+CREATE INDEX l_rd ON lineitem (l_receiptdate ASC);
+CREATE INDEX l_pk_sk ON lineitem (l_partkey ASC, l_suppkey ASC);
+CREATE INDEX l_sk_pk ON lineitem (l_suppkey ASC, l_partkey ASC);