This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a0bbb71e64 IGNITE-21721 Sql. Adjust cost estimation for index scan 
(#3579)
a0bbb71e64 is described below

commit a0bbb71e6428c205f8e565ca3f51b2a4fc82211c
Author: korlov42 <[email protected]>
AuthorDate: Wed Apr 10 12:10:05 2024 +0300

    IGNITE-21721 Sql. Adjust cost estimation for index scan (#3579)
---
 .../internal/benchmark/SqlIndexScanBenchmark.java  | 240 +++++++++++++++++++++
 .../ignite/internal/benchmark/TpchBenchmark.java   |  16 +-
 .../internal/sql/engine/ItSecondaryIndexTest.java  |   5 +-
 .../ignite/internal/sql/engine/ItSetOpTest.java    |   1 -
 .../sql/engine/exec/DestinationFactory.java        |   4 +-
 .../engine/exec/RehashingPartitionExtractor.java   |   4 +
 .../sql/engine/metadata/IgniteMdSelectivity.java   |  38 ----
 .../sql/engine/metadata/cost/IgniteCost.java       |   9 +
 .../sql/engine/prepare/PrepareServiceImpl.java     |   8 +-
 .../internal/sql/engine/rel/AbstractIndexScan.java |   9 +-
 .../internal/sql/engine/rel/IgniteProject.java     |  10 +-
 .../engine/rule/HashAggregateConverterRule.java    |   4 +-
 .../sql/engine/planner/AbstractPlannerTest.java    |  16 +-
 .../sql/engine/planner/AggregatePlannerTest.java   |  50 +++--
 .../planner/IndexSearchBoundsPlannerTest.java      |   2 +-
 .../planner/MapReduceHashAggregatePlannerTest.java |   6 +-
 .../src/test/resources/tpch/plan/q1.plan           |   4 +-
 .../internal/sql/engine/util/tpch/TpchTables.java  | 168 ++++++++++++++-
 .../resources/tpch/ddl/lineitem_ddl.sql            |   2 -
 .../resources/tpch/ddl/partsupp_ddl.sql            |   1 -
 20 files changed, 502 insertions(+), 95 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlIndexScanBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlIndexScanBenchmark.java
new file mode 100644
index 0000000000..ef10f8bde9
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlIndexScanBenchmark.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.nio.file.Files;
+import java.time.LocalDate;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.DataStreamerItem;
+import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that compares sequential scanning of index against full table 
scan.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@SuppressWarnings({"WeakerAccess", "unused"})
+public class SqlIndexScanBenchmark extends AbstractMultiNodeBenchmark {
+    /*
+        By default, cluster's work directory will be created as a temporary 
folder. This implies,
+        that all data generated by benchmark will be cleared automatically. 
However, this also implies
+        that cluster will be recreated on EVERY RUN. To initialize cluster 
once and then reuse it state
+        override `AbstractMultiNodeBenchmark.workDir()` method. Don't forget 
to clear that directory
+        afterwards.
+     */
+
+    private static final String DATASET_READY_MARK_FILE_NAME = "ready.txt";
+
+    private static final String SELECT_STATEMENT_TEMPLATE = "SELECT {} t.* 
FROM test t WHERE val >= ? LIMIT ?";
+
+    private static final int TABLE_SIZE = 1_500_000;
+    private static final LocalDate INITIAL_DATE = LocalDate.of(1970, 1, 1);
+
+    @Param({"1", "1000", "10000", "100000"})
+    private int limit;
+
+    @Param({"FIRST_N", "LAST_N"})
+    private ScanMode scanMode;
+
+    private IgniteSql sql;
+    private LocalDate startDate;
+
+    /** Initializes a schema and fills tables with data. */
+    @Setup
+    public void setUp() throws Exception {
+        try {
+            sql = clusterNode.sql();
+
+            if 
(!Files.exists(workDir().resolve(DATASET_READY_MARK_FILE_NAME))) {
+                sql.executeScript(
+                        "CREATE ZONE single_partition_zone WITH replicas = 1, 
partitions = 1;"
+                                + "CREATE TABLE test (id INT PRIMARY KEY, val 
DATE) WITH primary_zone = single_partition_zone;"
+                                + "CREATE INDEX test_val_idx ON test(val);"
+                );
+
+                CompletableFuture<?> result = 
clusterNode.tables().table("test")
+                        .recordView()
+                        .streamData(SubscriptionUtils.fromIterable(() -> 
IntStream.range(0, TABLE_SIZE)
+                                .mapToObj(i -> 
DataStreamerItem.of(Tuple.create()
+                                        .set("id", i)
+                                        .set("val", INITIAL_DATE.plusDays(i)))
+                                ).iterator()), DataStreamerOptions.DEFAULT);
+
+                result.get(15, TimeUnit.MINUTES);
+
+                
Files.createFile(workDir().resolve(DATASET_READY_MARK_FILE_NAME));
+            }
+
+            startDate = scanMode.valueForPredicate(limit);
+        } catch (Exception e) {
+            nodeTearDown();
+
+            throw e;
+        }
+    }
+
+    /** Measures performance of scan over a table. */
+    @Benchmark
+    public void forceTableScan(ForceTableScanState state, Blackhole bh) {
+        try (var rs = sql.execute(null, state.query, startDate, limit)) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /** Measures performance of scan over an index. */
+    @Benchmark
+    public void forceIndexScan(ForceIndexScanState state, Blackhole bh) {
+        try (var rs = sql.execute(null, state.query, startDate, limit)) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /** Measures performance of an optimizer's decision about what to use for 
particular query. */
+    @Benchmark
+    public void optimizatorChoiceScan(OptimizatorChoiceState state, Blackhole 
bh) {
+        try (var rs = sql.execute(null, state.query, startDate, limit)) {
+            while (rs.hasNext()) {
+                bh.consume(rs.next());
+            }
+        }
+    }
+
+    /**
+     * Benchmark's entry point.
+     */
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                .include(".*" + SqlIndexScanBenchmark.class.getSimpleName() + 
".*")
+                .build();
+
+        new Runner(opt).run();
+    }
+
+    /**
+     * State that keep query that prevent sql engine to use indexes.
+     */
+    @State(Scope.Benchmark)
+    public static class ForceTableScanState {
+        private String query;
+
+        @Setup
+        public void setup() {
+            query = format(SELECT_STATEMENT_TEMPLATE, "/*+ NO_INDEX */");
+        }
+    }
+
+    /**
+     * State that keep query that forces sql engine to use index.
+     */
+    @State(Scope.Benchmark)
+    public static class ForceIndexScanState {
+        private String query;
+
+        @Setup
+        public void setup() {
+            query = format(SELECT_STATEMENT_TEMPLATE, "/*+ 
FORCE_INDEX(test_val_idx) */");
+        }
+    }
+
+    /**
+     * State that keep query with any hints about which access method to 
prefer.
+     */
+    @State(Scope.Benchmark)
+    public static class OptimizatorChoiceState {
+        private String query;
+
+        @Setup
+        public void setup() {
+            query = format(SELECT_STATEMENT_TEMPLATE, "");
+        }
+    }
+
+    @Override
+    protected int nodes() {
+        return 1;
+    }
+
+    @Override
+    protected void createTable(String tableName) {
+        // NO-OP
+    }
+
+    /**
+     * Enumerates scanning modes used in benchmarks.
+     */
+    public enum ScanMode {
+        /**
+         * In this mode we will scan first N rows in the order they are appear.
+         */
+        FIRST_N {
+            @Override
+            LocalDate valueForPredicate(int limit) {
+                return INITIAL_DATE;
+            }
+        },
+        /**
+         * In this mode we will scan last N rows in ascending order.
+         *
+         * <p>For table scan this basically means scanning of the whole table. 
For index scan
+         * this means positioning to the end of the index and scanning last N 
rows.
+         */
+        LAST_N {
+            @Override
+            LocalDate valueForPredicate(int limit) {
+                return INITIAL_DATE.plusDays(TABLE_SIZE - limit);
+            }
+        };
+
+        abstract LocalDate valueForPredicate(int limit);
+    }
+}
+
+
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpchBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpchBenchmark.java
index c1c29ac278..14e4a32d4c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpchBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpchBenchmark.java
@@ -87,7 +87,6 @@ public class TpchBenchmark extends AbstractMultiNodeBenchmark 
{
 
     /** Initializes a schema and fills tables with data. */
     @Setup
-    @SuppressWarnings("ConstantValue")
     public void setUp() throws Exception {
         try {
             sql = clusterNode.sql();
@@ -140,13 +139,18 @@ public class TpchBenchmark extends 
AbstractMultiNodeBenchmark {
         new Runner(opt).run();
     }
 
-    private void fillTable(TpchTables table) throws IOException {
+    private void fillTable(TpchTables table) {
         System.out.println("Going to fill table \"" + table.tableName() + 
"\"...");
         long start = System.nanoTime();
         int count = 0;
-        for (String line : 
Files.readAllLines(pathToDataset.resolve(table.tableName() + ".tbl"))) {
-            Object[] params = line.split("\\|");
-
+        Iterable<Object[]> dataProvider = () -> {
+            try {
+                return table.dataProvider(pathToDataset);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+        for (Object[] params : dataProvider) {
             sql.execute(null, table.insertPrepareStatement(), params);
 
             if (++count % 10_000 == 0) {
@@ -161,5 +165,3 @@ public class TpchBenchmark extends 
AbstractMultiNodeBenchmark {
         // NO-OP
     }
 }
-
-
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index 9511e15002..41ce7cea2f 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -824,7 +824,10 @@ public class ItSecondaryIndexTest extends 
BaseSqlIntegrationTest {
 
     @Test
     public void testNotNullCondition() {
-        assertQuery("SELECT * FROM T1 WHERE val is not null")
+        // IS NOT NULL predicate has low selectivity, thus, given the cost of 
the index scan,
+        // it's considered cheaper to scan the whole table instead. Let's 
force planner to use
+        // index of interest
+        assertQuery("SELECT /*+ FORCE_INDEX(t1_idx) */ t1.* FROM T1 WHERE val 
is not null")
                 .matches(containsIndexScan("PUBLIC", "T1", "T1_IDX"))
                 .matches(not(containsUnion()))
                 .returns(3, 3)
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
index 2f425bed30..1cb1e03d59 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSetOpTest.java
@@ -350,7 +350,6 @@ public class ItSetOpTest extends BaseSqlIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21921";)
     public void testUnionWithDistinct() {
         var rows = sql(
                 "SELECT distinct(name) FROM emp1 UNION SELECT name from emp2");
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
index 6c5f13e7e2..ba99a8a5fc 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
@@ -92,9 +92,11 @@ class DestinationFactory<RowT> {
                     return new Identity<>(rowHandler, keys.get(0), 
group.nodeNames());
                 }
 
-                assert !nullOrEmpty(group.assignments()) && !nullOrEmpty(keys);
+                assert !nullOrEmpty(keys);
 
                 if (function.affinity()) {
+                    assert !nullOrEmpty(group.assignments());
+
                     int tableId = ((AffinityDistribution) function).tableId();
                     Supplier<PartitionCalculator> calculator = 
dependencies.partitionCalculator(tableId);
                     TableDescriptor tableDescriptor = 
dependencies.tableDescriptor(tableId);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
index 496b492924..220c088ccf 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RehashingPartitionExtractor.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+
 import org.apache.ignite.internal.util.IgniteUtils;
 
 /** Extract assignment based on incoming row. */
@@ -31,6 +33,8 @@ public class RehashingPartitionExtractor<RowT> implements 
RowPartitionExtractor<
             int[] fields,
             RowHandler<RowT> rowHandler
     ) {
+        assert !nullOrEmpty(fields);
+
         this.targetCount = targetCount;
         this.fields = fields;
         this.rowHandler = rowHandler;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java
index 78876a4356..d632aaa425 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdSelectivity.java
@@ -17,10 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.metadata;
 
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-
-import java.util.List;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdSelectivity;
 import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -28,14 +24,12 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.MultiBounds;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.RangeBounds;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
-import org.apache.ignite.internal.sql.engine.rel.AbstractIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSortedIndexSpool;
 import 
org.apache.ignite.internal.sql.engine.rel.ProjectableFilterableTableScan;
@@ -50,38 +44,6 @@ public class IgniteMdSelectivity extends RelMdSelectivity {
             ReflectiveRelMetadataProvider.reflectiveSource(
                     BuiltInMethod.SELECTIVITY.method, new 
IgniteMdSelectivity());
 
-    /**
-     * GetSelectivity.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     */
-    public Double getSelectivity(AbstractIndexScan rel, RelMetadataQuery mq, 
RexNode predicate) {
-        if (predicate != null) {
-            return getSelectivity((ProjectableFilterableTableScan) rel, mq, 
predicate);
-        }
-
-        List<SearchBounds> searchBounds = rel.searchBounds();
-
-        if (nullOrEmpty(searchBounds)) {
-            return RelMdUtil.guessSelectivity(rel.condition());
-        }
-
-        double idxSelectivity = 1.0;
-
-        List<RexNode> conjunctions = RelOptUtil.conjunctions(rel.condition());
-
-        for (SearchBounds bounds : searchBounds) {
-            if (bounds != null) {
-                conjunctions.remove(bounds.condition());
-            }
-
-            idxSelectivity *= guessCostMultiplier(bounds);
-        }
-
-        RexNode remaining = RexUtil.composeConjunction(RexUtils.builder(rel), 
conjunctions, true);
-
-        return idxSelectivity * RelMdUtil.guessSelectivity(remaining);
-    }
-
     /**
      * GetSelectivity.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/cost/IgniteCost.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/cost/IgniteCost.java
index eea42cdaf2..34ba549036 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/cost/IgniteCost.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/cost/IgniteCost.java
@@ -36,6 +36,15 @@ public class IgniteCost implements RelOptCost {
     /** Cost of a comparison of one row. */
     public static final double ROW_COMPARISON_COST = 3;
 
+    /**
+     * According to benchmark, deriving a single row from index approximately 
5 times slower that deriving
+     * a single row from table, so we have to reflect this in cost estimation. 
But in case of multiplier=5
+     * indexes won't be chosen by optimiser for range scan with only bound. 
Thus, multiplier=4 looks like
+     * good compromise between reflecting the real state of things and not 
breaking up usage of index for
+     * range scans.
+     */
+    public static final double INDEX_ROW_SCAN_MULTIPLIER = 4;
+
     /** Memory cost of a aggregate call. */
     public static final double AGG_CALL_MEM_COST = 5;
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index d8b0b7c3f0..98170fa10c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -347,7 +347,13 @@ public class PrepareServiceImpl implements PrepareService {
                     );
                 }
 
-                return new MultiStepPlan(nextPlanId(), SqlQueryType.QUERY, 
clonedTree, resultSetMetadata, parameterMetadata);
+                var plan = new MultiStepPlan(nextPlanId(), SqlQueryType.QUERY, 
clonedTree, resultSetMetadata, parameterMetadata);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Plan prepared: \n{}\n{}", 
parsedResult.originalQuery(), plan.explain());
+                }
+
+                return plan;
             }, planningPool));
 
             return planFut.thenApply(Function.identity());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java
index d7e416f6ce..71e5dbb79e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIndexScan.java
@@ -143,8 +143,10 @@ public abstract class AbstractIndexScan extends 
ProjectableFilterableTableScan {
             }
         }
 
+        double indexRowPassThroughCost = IgniteCost.ROW_PASS_THROUGH_COST * 
IgniteCost.INDEX_ROW_SCAN_MULTIPLIER;
+
         if (condition == null) {
-            cost = rows * IgniteCost.ROW_PASS_THROUGH_COST;
+            cost = rows * indexRowPassThroughCost;
         } else {
             double selectivity = 1;
 
@@ -163,11 +165,10 @@ public abstract class AbstractIndexScan extends 
ProjectableFilterableTableScan {
                 rows = 1;
             }
 
-            cost += rows * (IgniteCost.ROW_COMPARISON_COST + 
IgniteCost.ROW_PASS_THROUGH_COST);
+            cost += rows * (IgniteCost.ROW_COMPARISON_COST + 
indexRowPassThroughCost);
         }
 
-        // additional tiny cost for preventing equality with table scan.
-        return planner.getCostFactory().makeCost(rows, cost, 
0).plus(planner.getCostFactory().makeTinyCost());
+        return planner.getCostFactory().makeCost(rows, cost, 0);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
index 7a05ddc2db..d8273005f0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
@@ -229,7 +229,15 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
     public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
         double rowCount = mq.getRowCount(getInput());
 
-        return planner.getCostFactory().makeCost(rowCount, rowCount * 
IgniteCost.ROW_PASS_THROUGH_COST, 0);
+        RelOptCost cost = planner.getCostFactory().makeCost(rowCount, rowCount 
* IgniteCost.ROW_PASS_THROUGH_COST, 0);
+
+        if (distribution() == single()) {
+            // make single distributed projection slightly more expensive to 
help
+            // planner prefer distributed option, if exists
+            cost = cost.plus(planner.getCostFactory().makeTinyCost());
+        }
+
+        return cost;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
index 15d72f5f7f..6542b6fc03 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java
@@ -131,8 +131,8 @@ public class HashAggregateConverterRule {
 
                     return new IgniteProject(
                             agg.getCluster(),
-                            reducePhaseTraits,
-                            convert(input, 
inTrait.replace(IgniteDistributions.single())),
+                            input.getTraitSet(),
+                            input,
                             reduceInputExprs,
                             projectRowType
                     );
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index efbf99e7ae..6acddeed67 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -791,13 +791,15 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
     }
 
     protected Predicate<? extends RelNode> projectFromTable(String tableName, 
String... exprs) {
-        return isInstanceOf(IgniteProject.class)
-                .and(projection -> {
-                    String actualProjStr = projection.getProjects().toString();
-                    String expectedProjStr = Arrays.asList(exprs).toString();
-                    return actualProjStr.equals(expectedProjStr);
-                })
-                .and(hasChildThat(isTableScan(tableName)));
+        return nodeOrAnyChild(
+                isInstanceOf(IgniteProject.class)
+                        .and(projection -> {
+                            String actualProjStr = 
projection.getProjects().toString();
+                            String expectedProjStr = 
Arrays.asList(exprs).toString();
+                            return actualProjStr.equals(expectedProjStr);
+                        })
+                        .and(input(isTableScan(tableName)))
+        );
     }
 
     /**
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index c41b377775..ded0e04155 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -186,23 +186,26 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
      */
     @Test
     public void aggregateWithGroupByIndexPrefixColumns() throws Exception {
-        checkAggWithGroupByIndexColumnsSingle(TestCase.CASE_9);
         checkAggWithGroupByIndexColumnsSingle(TestCase.CASE_10);
         checkAggWithGroupByIndexColumnsSingle(TestCase.CASE_11);
 
-        checkAggWithGroupByIndexColumnsSort(TestCase.CASE_9A);
         checkAggWithGroupByIndexColumnsSort(TestCase.CASE_10A);
         checkAggWithGroupByIndexColumnsSort(TestCase.CASE_11A);
 
-        checkAggWithGroupByIndexColumnsSort(TestCase.CASE_9B);
         checkAggWithGroupByIndexColumnsSort(TestCase.CASE_10B);
         checkAggWithGroupByIndexColumnsSort(TestCase.CASE_11B);
 
-        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9C);
         checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_10C);
         checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_11C);
 
-        checkAggWithColocatedGroupByIndexColumnsHash(TestCase.CASE_9D);
+        // grouping by a single column results in a lower number of groups, 
this makes
+        // hash aggregate slightly better than sorted because we occupy less 
memory,
+        // and scanning by an index is expensive
+        checkSimpleAggWithGroupBySingle(TestCase.CASE_9);
+        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9A);
+        checkAggWithGroupByIndexColumnsHash(TestCase.CASE_9B);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_9C);
+        checkSimpleAggWithColocatedGroupByHash(TestCase.CASE_9D);
     }
 
     /**
@@ -212,11 +215,21 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
     public void distinctWithoutAggregate() throws Exception {
         checkGroupWithNoAggregateSingle(TestCase.CASE_12);
 
+        // even though collation satisfies the grouping,
+        // scan over and index is too expensive, for aggregates
+        // emitting rows with up to 2 columns it's considered
+        // cheaper (according to current cost model) to use hash
+        // aggregates instead
+        checkGroupWithNoAggregateSingle(TestCase.CASE_13);
+
         checkGroupWithNoAggregateHash(TestCase.CASE_12A);
         checkGroupWithNoAggregateHash(TestCase.CASE_12B);
 
         checkColocatedGroupWithNoAggregateHash(TestCase.CASE_12C);
         checkColocatedGroupWithNoAggregateHash(TestCase.CASE_12D);
+
+        checkColocatedGroupWithNoAggregateHash(TestCase.CASE_13C);
+        checkColocatedGroupWithNoAggregateHash(TestCase.CASE_13D);
     }
 
     /**
@@ -224,13 +237,11 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
      */
     @Test
     public void distinctWithoutAggregateUseIndex() throws Exception {
-        checkGroupWithNoAggregateUseIndexSingle(TestCase.CASE_13);
-
+        // hash aggregate adds an extra column for group id, this adds an 
extra cost
+        // to the memory part making two phase hash aggregate slightly more 
expensive
+        // than sorted ones
         checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13A);
         checkGroupWithNoAggregateUseIndexHash(TestCase.CASE_13B);
-
-        checkColocatedGroupWithNoAggregateUseIndexHash(TestCase.CASE_13C);
-        checkColocatedGroupWithNoAggregateUseIndexHash(TestCase.CASE_13D);
     }
 
     /**
@@ -478,9 +489,9 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
 
         Predicate<RelNode> nonColocated = 
hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class)
                 .and(in -> hasAggregates(sumReduce, 
sum0Reduce).test(in.getAggregateCalls()))
-                .and(input(isInstanceOf(IgniteProject.class)
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                .and(hasDistribution(single()))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(single()))
+                        .and(input(isInstanceOf(IgniteProject.class)
                                 
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
                                                 .and(in -> 
hasAggregates(sumMap, countMap).test(in.getAggCallList()))
                                         )
@@ -709,6 +720,19 @@ public class AggregatePlannerTest extends 
AbstractAggregatePlannerTest {
         );
     }
 
+    private void checkAggWithGroupByIndexColumnsHash(TestCase testCase) throws 
Exception {
+        assertPlan(testCase,
+                nodeOrAnyChild(isInstanceOf(IgniteReduceHashAggregate.class)
+                        .and(input(isInstanceOf(IgniteExchange.class)
+                                
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
+                                        .and(hasAggregate())
+                                        .and(input(isTableScan("TEST")))
+                                ))
+                        ))
+                )
+        );
+    }
+
     private void checkAggWithGroupByIndexColumnsSort(TestCase testCase) throws 
Exception {
         assertPlan(testCase,
                 nodeOrAnyChild(isInstanceOf(IgniteReduceSortAggregate.class)
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IndexSearchBoundsPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IndexSearchBoundsPlannerTest.java
index b805c56f4e..eb659bb65a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IndexSearchBoundsPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IndexSearchBoundsPlannerTest.java
@@ -196,7 +196,7 @@ public class IndexSearchBoundsPlannerTest extends 
AbstractPlannerTest {
 
         assertBounds("SELECT * FROM TEST WHERE C4 IS NULL", exact("null"));
 
-        assertBounds("SELECT * FROM TEST WHERE C4 IS NOT NULL",
+        assertBounds("SELECT /*+ FORCE_INDEX(c4) */ * FROM TEST WHERE C4 IS 
NOT NULL",
                 range(null, "null", true, false));
 
         assertBounds("SELECT * FROM TEST WHERE C4 IN (1, 2, 3) AND C3 > 1",
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
index 8230d06a47..644718598f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceHashAggregatePlannerTest.java
@@ -495,9 +495,9 @@ public class MapReduceHashAggregatePlannerTest extends 
AbstractAggregatePlannerT
 
         Predicate<RelNode> nonColocated = 
hasChildThat(isInstanceOf(IgniteReduceHashAggregate.class)
                 .and(in -> hasAggregates(sumReduce, 
sum0Reduce).test(in.getAggregateCalls()))
-                .and(input(isInstanceOf(IgniteProject.class)
-                        .and(input(isInstanceOf(IgniteExchange.class)
-                                
.and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(hasDistribution(IgniteDistributions.single()))
+                        .and(input(isInstanceOf(IgniteProject.class)
                                 
.and(input(isInstanceOf(IgniteMapHashAggregate.class)
                                                 .and(in -> 
hasAggregates(sumMap, countMap).test(in.getAggCallList()))
                                         )
diff --git a/modules/sql-engine/src/test/resources/tpch/plan/q1.plan 
b/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
index af8defab87..763b0cc06b 100644
--- a/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
+++ b/modules/sql-engine/src/test/resources/tpch/plan/q1.plan
@@ -1,7 +1,7 @@
 Sort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
   Project(inputs=[0..5], exprs=[[DECIMAL_DIVIDE($6, $7, 15, 2), 
DECIMAL_DIVIDE($8, $9, 15, 2), DECIMAL_DIVIDE($10, $11, 15, 2), 
CAST($12):BIGINT NOT NULL]])
     ReduceHashAggregate(group=[{0, 1}], SUM_QTY=[SUM($2)], 
SUM_BASE_PRICE=[SUM($3)], SUM_DISC_PRICE=[SUM($4)], SUM_CHARGE=[SUM($5)], 
AVG_SUM6=[SUM($6)], AVG_SUM06=[$SUM0($7)], AVG_SUM8=[SUM($8)], 
AVG_SUM08=[$SUM0($9)], AVG_SUM10=[SUM($10)], AVG_SUM010=[$SUM0($11)], 
COUNT_12_MAP_SUM=[$SUM0($12)])
-      Project(inputs=[0..6], exprs=[[CAST($7):DECIMAL(32767, 0) NOT NULL, $8, 
CAST($9):DECIMAL(32767, 0) NOT NULL, $10, CAST($11):DECIMAL(32767, 0) NOT NULL, 
$12, $13]])
-        Exchange(distribution=[single])
+      Exchange(distribution=[single])
+        Project(inputs=[0..6], exprs=[[CAST($7):DECIMAL(32767, 0) NOT NULL, 
$8, CAST($9):DECIMAL(32767, 0) NOT NULL, $10, CAST($11):DECIMAL(32767, 0) NOT 
NULL, $12, $13]])
           MapHashAggregate(group=[{0, 1}], SUM_QTY=[SUM($2)], 
SUM_BASE_PRICE=[SUM($3)], SUM_DISC_PRICE=[SUM($4)], SUM_CHARGE=[SUM($5)], 
AVG_SUM6=[SUM($2)], AVG_COUNT6=[COUNT($2)], AVG_SUM8=[SUM($3)], 
AVG_COUNT8=[COUNT($3)], AVG_SUM10=[SUM($6)], AVG_COUNT10=[COUNT($6)], 
COUNT_ORDER=[COUNT()])
             IndexScan(table=[[PUBLIC, LINEITEM]], index=[L_SD], type=[SORTED], 
searchBounds=[[RangeBounds [lowerBound=null, upperBound=-(1998-12-01, 
7776000000:INTERVAL DAY), lowerInclude=true, upperInclude=true]]], 
filters=[<=($t6, -(1998-12-01, 7776000000:INTERVAL DAY))], projects=[[$t4, $t5, 
$t0, $t1, *($t1, -(1, $t2)), *(*($t1, -(1, $t2)), +(1, $t3)), $t2]], 
requiredColumns=[{4, 5, 6, 7, 8, 9, 10}], collation=[[10]])
diff --git 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/tpch/TpchTables.java
 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/tpch/TpchTables.java
index f335a03ccf..9d79229c54 100644
--- 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/tpch/TpchTables.java
+++ 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/tpch/TpchTables.java
@@ -17,11 +17,43 @@
 
 package org.apache.ignite.internal.sql.engine.util.tpch;
 
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.STRING;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.LocalDate;
+import java.util.Iterator;
+import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.KeyValueView;
+
 /**
  * Enumeration of tables from TPC-H specification.
  */
 public enum TpchTables {
-    LINEITEM {
+    LINEITEM(
+            new Column("L_ORDERKEY", INT32),
+            new Column("L_PARTKEY", INT32),
+            new Column("L_SUPPKEY", INT32),
+            new Column("L_LINENUMBER", INT32),
+            new Column("L_QUANTITY", DECIMAL),
+            new Column("L_EXTENDEDPRICE", DECIMAL),
+            new Column("L_DISCOUNT", DECIMAL),
+            new Column("L_TAX", DECIMAL),
+            new Column("L_RETURNFLAG", STRING),
+            new Column("L_LINESTATUS", STRING),
+            new Column("L_SHIPDATE", DATE),
+            new Column("L_COMMITDATE", DATE),
+            new Column("L_RECEIPTDATE", DATE),
+            new Column("L_SHIPINSTRUCT", STRING),
+            new Column("L_SHIPMODE", STRING),
+            new Column("L_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -32,7 +64,17 @@ public enum TpchTables {
         }
     },
 
-    PART {
+    PART(
+            new Column("P_PARTKEY", INT32),
+            new Column("P_NAME", STRING),
+            new Column("P_MFGR", STRING),
+            new Column("P_BRAND", STRING),
+            new Column("P_TYPE", STRING),
+            new Column("P_SIZE", INT32),
+            new Column("P_CONTAINER", STRING),
+            new Column("P_RETAILPRICE", DECIMAL),
+            new Column("P_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -42,7 +84,15 @@ public enum TpchTables {
         }
     },
 
-    SUPPLIER {
+    SUPPLIER(
+            new Column("S_SUPPKEY", INT32),
+            new Column("S_NAME", STRING),
+            new Column("S_ADDRESS", STRING),
+            new Column("S_NATIONKEY", INT32),
+            new Column("S_PHONE", STRING),
+            new Column("S_ACCTBAL", DECIMAL),
+            new Column("S_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -51,7 +101,13 @@ public enum TpchTables {
         }
     },
 
-    PARTSUPP {
+    PARTSUPP(
+            new Column("PS_PARTKEY", INT32),
+            new Column("PS_SUPPKEY", INT32),
+            new Column("PS_AVAILQTY", INT32),
+            new Column("PS_SUPPLYCOST", DECIMAL),
+            new Column("PS_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -60,7 +116,12 @@ public enum TpchTables {
         }
     },
 
-    NATION {
+    NATION(
+            new Column("N_NATIONKEY", INT32),
+            new Column("N_NAME", STRING),
+            new Column("N_REGIONKEY", INT32),
+            new Column("N_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -68,7 +129,11 @@ public enum TpchTables {
         }
     },
 
-    REGION {
+    REGION(
+            new Column("R_REGIONKEY", INT32),
+            new Column("R_NAME", STRING),
+            new Column("R_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -76,7 +141,17 @@ public enum TpchTables {
         }
     },
 
-    ORDERS {
+    ORDERS(
+            new Column("O_ORDERKEY", INT32),
+            new Column("O_CUSTKEY", INT32),
+            new Column("O_ORDERSTATUS", STRING),
+            new Column("O_TOTALPRICE", DECIMAL),
+            new Column("O_ORDERDATE", DATE),
+            new Column("O_ORDERPRIORITY", STRING),
+            new Column("O_CLERK", STRING),
+            new Column("O_SHIPPRIORITY", INT32),
+            new Column("O_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -86,7 +161,16 @@ public enum TpchTables {
         }
     },
 
-    CUSTOMER {
+    CUSTOMER(
+            new Column("C_CUSTKEY", INT32),
+            new Column("C_NAME", STRING),
+            new Column("C_ADDRESS", STRING),
+            new Column("C_NATIONKEY", INT32),
+            new Column("C_PHONE", STRING),
+            new Column("C_ACCTBAL", DECIMAL),
+            new Column("C_MKTSEGMENT", STRING),
+            new Column("C_COMMENT", STRING)
+    ) {
         @Override
         public String insertPrepareStatement() {
             return "INSERT INTO " + tableName() + " VALUES ("
@@ -96,11 +180,26 @@ public enum TpchTables {
         }
     };
 
+    private final Column[] columns;
+
+    TpchTables(Column... columns) {
+        this.columns = columns;
+    }
+
     /** Returns name of the table. */
     public String tableName() {
         return name().toLowerCase();
     }
 
+    /** Returns number of column in the table. */
+    public int columnsCount() {
+        return columns.length;
+    }
+
+    public String columnName(int idx) {
+        return columns[idx].name;
+    }
+
     /** Returns definition of a table including necessary indexes. */
     public String ddlScript() {
         return TpchHelper.loadFromResource("tpch/ddl/" + tableName() + 
"_ddl.sql");
@@ -116,7 +215,56 @@ public enum TpchTables {
      * <p>The statement returned is tolerant to columns' type mismatch, 
implying you
      * can use any value while there is cast from provided value to required 
type.
      */
-    public String insertPrepareStatement() {
-        throw new AssertionError("`insertPrepareStatement` must be overriden");
+    public abstract String insertPrepareStatement();
+
+    /**
+     * Returns iterator returning rows of the corresponding table.
+     *
+     * <p>May be used to fill the table via {@link KeyValueView KV API} or 
{@link IgniteSql SQL API}.
+     *
+     * @param pathToDataset A path to a directory with CSV file containing 
data for the table.
+     * @return Iterator over data of the table.
+     * @throws IOException In case of error.
+     */
+    public Iterator<Object[]> dataProvider(Path pathToDataset) throws 
IOException {
+        return Files.lines(pathToDataset.resolve(tableName() + ".tbl"))
+                .map(this::csvLineToTableValues)
+                .iterator();
+    }
+
+    private Object[] csvLineToTableValues(String line) {
+        String[] stringValues = line.split("\\|");
+        Object[] values = new Object[columns.length];
+
+        for (int i = 0; i < columns.length; i++) {
+            switch (columns[i].type) {
+                case INT32:
+                    values[i] = Integer.valueOf(stringValues[i]);
+                    break;
+                case DECIMAL:
+                    values[i] = new BigDecimal(stringValues[i]);
+                    break;
+                case DATE:
+                    values[i] = LocalDate.parse(stringValues[i]);
+                    break;
+                case STRING:
+                    values[i] = stringValues[i];
+                    break;
+                default:
+                    throw new 
IllegalStateException(columns[i].type.toString());
+            }
+        }
+
+        return values;
+    }
+
+    private static class Column {
+        private final String name;
+        private final ColumnType type;
+
+        private Column(String name, ColumnType type) {
+            this.name = name;
+            this.type = type;
+        }
     }
 }
diff --git 
a/modules/sql-engine/src/testFixtures/resources/tpch/ddl/lineitem_ddl.sql 
b/modules/sql-engine/src/testFixtures/resources/tpch/ddl/lineitem_ddl.sql
index a08f8f1603..1e7629d576 100644
--- a/modules/sql-engine/src/testFixtures/resources/tpch/ddl/lineitem_ddl.sql
+++ b/modules/sql-engine/src/testFixtures/resources/tpch/ddl/lineitem_ddl.sql
@@ -24,8 +24,6 @@ CREATE TABLE lineitem (
     PRIMARY KEY (l_orderkey, l_linenumber)
 );
 
-CREATE INDEX l_pk ON lineitem (l_partkey ASC);
-CREATE INDEX l_sk ON lineitem (l_suppkey ASC);
 CREATE INDEX l_sd ON lineitem (l_shipdate ASC);
 CREATE INDEX l_cd ON lineitem (l_commitdate ASC);
 CREATE INDEX l_rd ON lineitem (l_receiptdate ASC);
diff --git 
a/modules/sql-engine/src/testFixtures/resources/tpch/ddl/partsupp_ddl.sql 
b/modules/sql-engine/src/testFixtures/resources/tpch/ddl/partsupp_ddl.sql
index 5c5d3fc820..b93210e667 100644
--- a/modules/sql-engine/src/testFixtures/resources/tpch/ddl/partsupp_ddl.sql
+++ b/modules/sql-engine/src/testFixtures/resources/tpch/ddl/partsupp_ddl.sql
@@ -13,6 +13,5 @@ CREATE TABLE partsupp (
     PRIMARY KEY (ps_partkey, ps_suppkey)
 );
 
-CREATE INDEX ps_sk ON partsupp (ps_suppkey ASC);
 CREATE INDEX ps_pk ON partsupp (ps_partkey ASC);
 CREATE INDEX ps_sk_pk ON partsupp (ps_suppkey ASC, ps_partkey ASC);


Reply via email to