This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a7a6e4e93e IGNITE-20389 Sql. Avoid unconditional conversion of table
row in sql engine (#2568)
a7a6e4e93e is described below
commit a7a6e4e93ee0be3563491e1cf461f342f0e24267
Author: korlov42 <[email protected]>
AuthorDate: Wed Sep 13 16:14:05 2023 +0300
IGNITE-20389 Sql. Avoid unconditional conversion of table row in sql engine
(#2568)
---
.../internal/benchmark/SqlOneNodeBenchmark.java | 147 +++++++++++++
.../ignite/internal/schema/BinaryTupleSchema.java | 3 +-
.../ignite/internal/schema/SchemaTestUtils.java | 5 +-
modules/sql-engine/build.gradle | 1 +
.../internal/sql/api/AsyncResultSetImpl.java | 5 +
.../engine/exec/ExecutableTableRegistryImpl.java | 9 +-
.../sql/engine/exec/ScannableTableImpl.java | 18 +-
.../sql/engine/exec/TableRowConverter.java | 15 +-
...onverter.java => TableRowConverterFactory.java} | 24 +--
.../sql/engine/exec/TableRowConverterImpl.java | 84 ++++----
.../sql/engine/exec/UpdatableTableImpl.java | 2 +-
.../sql/engine/schema/TableDescriptor.java | 2 +-
.../sql/engine/schema/TableDescriptorImpl.java | 7 +
.../sql/engine/util/AbstractProjectedTuple.java | 227 +++++++++++++++++++++
.../util/FieldDeserializingProjectedTuple.java | 75 +++++++
.../sql/engine/util/FormatAwareProjectedTuple.java | 93 +++++++++
.../exec/ExecutableTableRegistrySelfTest.java | 3 +
.../engine/exec/rel/ScannableTableSelfTest.java | 31 ++-
.../exec/rel/TableScanNodeExecutionTest.java | 5 +-
.../sql/engine/planner/AbstractPlannerTest.java | 6 +
.../sql/engine/util/ProjectedTupleTest.java | 179 ++++++++++++++++
21 files changed, 837 insertions(+), 104 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
new file mode 100644
index 0000000000..711ef16845
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlOneNodeBenchmark.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that runs sql queries via embedded client on single node cluster.
+ */
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@BenchmarkMode(Mode.AverageTime)
+@Warmup(iterations = 3, time = 5)
+@Measurement(iterations = 5, time = 5)
+@Threads(1)
+@Fork(1)
+@SuppressWarnings({"WeakerAccess", "unused"})
+public class SqlOneNodeBenchmark extends AbstractOneNodeBenchmark {
+ private static final int TABLE_SIZE = 30_000;
+
+ private Session session;
+
+ /** Fills the table with data. */
+ @Setup
+ public void setUp() throws IOException {
+ KeyValueView<Tuple, Tuple> keyValueView =
clusterNode.tables().table(TABLE_NAME).keyValueView();
+
+ Tuple payload = Tuple.create();
+ for (int j = 1; j <= 10; j++) {
+ payload.set("field" + j, FIELD_VAL);
+ }
+
+ int batchSize = 1_000;
+ Map<Tuple, Tuple> batch = new HashMap<>();
+ for (int i = 0; i < TABLE_SIZE; i++) {
+ batch.put(Tuple.create().set("ycsb_key", i), payload);
+
+ if (batch.size() == batchSize) {
+ keyValueView.putAll(null, batch);
+
+ batch.clear();
+ }
+ }
+
+ if (!batch.isEmpty()) {
+ keyValueView.putAll(null, batch);
+
+ batch.clear();
+ }
+
+ session = clusterNode.sql().createSession();
+ }
+
+ /** Benchmark that measures performance of `SELECT count(*)` query over
entire table. */
+ @Benchmark
+ public void countAll(Blackhole bh) {
+ try (var rs = session.execute(null, "SELECT count(*) FROM usertable"))
{
+ bh.consume(rs.next());
+ }
+ }
+
+ /** Benchmark that measures performance of `SELECT count(1)` query over
entire table. */
+ @Benchmark
+ public void count1(Blackhole bh) {
+ try (var rs = session.execute(null, "SELECT count(1) FROM usertable"))
{
+ bh.consume(rs.next());
+ }
+ }
+
+ /** Benchmark that measures performance of `SELECT count(key)` query over
entire table. */
+ @Benchmark
+ public void countKey(Blackhole bh) {
+ try (var rs = session.execute(null, "SELECT count(ycsb_key) FROM
usertable")) {
+ bh.consume(rs.next());
+ }
+ }
+
+ /** Benchmark that measures performance of `SELECT count(val)` query over
entire table. */
+ @Benchmark
+ public void countVal(Blackhole bh) {
+ try (var rs = session.execute(null, "SELECT count(field2) FROM
usertable")) {
+ bh.consume(rs.next());
+ }
+ }
+
+ /** Benchmark that measures performance of `SELECT *` query over entire
table. */
+ @Benchmark
+ @Warmup(iterations = 3, time = 5)
+ @Measurement(iterations = 5, time = 5)
+ public void selectAll(Blackhole bh) {
+ try (var rs = session.execute(null, "SELECT * FROM usertable")) {
+ while (rs.hasNext()) {
+ bh.consume(rs.next());
+ }
+ }
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + SqlOneNodeBenchmark.class.getSimpleName() +
".*")
+ .build();
+
+ new Runner(opt).run();
+ }
+}
+
+
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
index d5e9e8d356..1451e9407e 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema;
import java.math.BigDecimal;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.schema.row.InternalTuple;
import org.jetbrains.annotations.Nullable;
/**
@@ -287,7 +288,7 @@ public class BinaryTupleSchema {
* @param index Field index to read.
* @return An Object representation of the value.
*/
- public Object value(BinaryTupleReader tuple, int index) {
+ public Object value(InternalTuple tuple, int index) {
Element element = element(index);
switch (element.typeSpec) {
diff --git
a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/SchemaTestUtils.java
b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/SchemaTestUtils.java
index 83bc16ce09..8fc609b339 100644
---
a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/SchemaTestUtils.java
+++
b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/SchemaTestUtils.java
@@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -48,7 +49,7 @@ public final class SchemaTestUtils {
private static final int MAX_YEAR = (1 << 14) - 1;
/** All types for tests. */
- public static List<NativeType> ALL_TYPES = List.of(
+ public static final List<NativeType> ALL_TYPES = List.of(
NativeTypes.BOOLEAN,
NativeTypes.INT8,
NativeTypes.INT16,
@@ -98,7 +99,7 @@ public final class SchemaTestUtils {
return rnd.nextDouble();
case UUID:
- return new java.util.UUID(rnd.nextLong(), rnd.nextLong());
+ return new UUID(rnd.nextLong(), rnd.nextLong());
case STRING:
return IgniteTestUtils.randomString(rnd, rnd.nextInt(255));
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 55f708a199..33bd95b942 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -82,6 +82,7 @@ dependencies {
testImplementation libs.jmh.core
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-configuration')))
+ testImplementation(testFixtures(project(':ignite-schema')))
testImplementation(testFixtures(project(':ignite-storage-api')))
testImplementation(testFixtures(project(':ignite-distribution-zones')))
testImplementation(testFixtures(project(':ignite-placement-driver-api')))
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 25f81b7a89..0a43985276 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -414,5 +414,10 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
public ResultSetMetadata metadata() {
return meta;
}
+
+ @Override
+ public String toString() {
+ return "Row " + row;
+ }
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 27816ec190..0608c85e51 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -36,7 +36,6 @@ import
org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.TableManager;
-
/**
* Implementation of {@link ExecutableTableRegistry}.
*/
@@ -88,12 +87,14 @@ public class ExecutableTableRegistryImpl implements
ExecutableTableRegistry, Sch
return f.thenApply((table) -> {
SchemaRegistry schemaRegistry = table.getValue();
SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
- TableRowConverter rowConverter = new
TableRowConverterImpl(schemaRegistry, schemaDescriptor, tableDescriptor);
+ TableRowConverterFactory converterFactory = requiredColumns -> new
TableRowConverterImpl(
+ schemaRegistry, schemaDescriptor, tableDescriptor,
requiredColumns
+ );
InternalTable internalTable = table.getKey();
- ScannableTable scannableTable = new
ScannableTableImpl(internalTable, rowConverter, tableDescriptor);
+ ScannableTable scannableTable = new
ScannableTableImpl(internalTable, converterFactory, tableDescriptor);
UpdatableTableImpl updatableTable = new
UpdatableTableImpl(tableId, tableDescriptor, internalTable.partitions(),
- replicaService, clock, rowConverter, schemaDescriptor);
+ replicaService, clock, converterFactory.create(null),
schemaDescriptor);
return new ExecutableTableImpl(internalTable, scannableTable,
updatableTable);
});
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index 147086a52b..158fd7a94a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -42,14 +42,14 @@ public class ScannableTableImpl implements ScannableTable {
private final InternalTable internalTable;
- private final TableRowConverter rowConverter;
+ private final TableRowConverterFactory converterFactory;
private final TableDescriptor tableDescriptor;
/** Constructor. */
- public ScannableTableImpl(InternalTable internalTable, TableRowConverter
rowConverter, TableDescriptor tableDescriptor) {
+ public ScannableTableImpl(InternalTable internalTable,
TableRowConverterFactory converterFactory, TableDescriptor tableDescriptor) {
this.internalTable = internalTable;
- this.rowConverter = rowConverter;
+ this.converterFactory = converterFactory;
this.tableDescriptor = tableDescriptor;
}
@@ -73,7 +73,9 @@ public class ScannableTableImpl implements ScannableTable {
pub = internalTable.scan(partWithTerm.partId(), txAttributes.id(),
recipient, null, null, null, 0, null);
}
- return new TransformingPublisher<>(pub, item ->
rowConverter.toRow(ctx, item, rowFactory, requiredColumns));
+ TableRowConverter rowConverter =
converterFactory.create(requiredColumns);
+
+ return new TransformingPublisher<>(pub, item ->
rowConverter.toRow(ctx, item, rowFactory));
}
/** {@inheritDoc} */
@@ -133,7 +135,9 @@ public class ScannableTableImpl implements ScannableTable {
);
}
- return new TransformingPublisher<>(pub, item ->
rowConverter.toRow(ctx, item, rowFactory, requiredColumns));
+ TableRowConverter rowConverter =
converterFactory.create(requiredColumns);
+
+ return new TransformingPublisher<>(pub, item ->
rowConverter.toRow(ctx, item, rowFactory));
}
/** {@inheritDoc} */
@@ -173,7 +177,9 @@ public class ScannableTableImpl implements ScannableTable {
);
}
- return new TransformingPublisher<>(pub, item ->
rowConverter.toRow(ctx, item, rowFactory, requiredColumns));
+ TableRowConverter rowConverter =
converterFactory.create(requiredColumns);
+
+ return new TransformingPublisher<>(pub, item ->
rowConverter.toRow(ctx, item, rowFactory));
}
private <RowT> @Nullable BinaryTuplePrefix
toBinaryTuplePrefix(ExecutionContext<RowT> ctx,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverter.java
index 2e84a07c8b..f8bcbd3898 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverter.java
@@ -17,9 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.util.BitSet;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.jetbrains.annotations.Nullable;
/**
* Converts rows to execution engine representation.
@@ -27,18 +25,17 @@ import org.jetbrains.annotations.Nullable;
public interface TableRowConverter {
/**
- * Converts a tuple to relational node row.
+ * Converts a table row to relational node row.
*
- * @param ectx Execution context.
- * @param row Tuple to convert.
- * @param requiredColumns Participating columns.
+ * @param ectx Execution context.
+ * @param tableRow Tuple to convert.
+ * @param factory Factory to use to create a sql row from given table row.
* @return Relational node row.
*/
<RowT> RowT toRow(
ExecutionContext<RowT> ectx,
- BinaryRow row,
- RowHandler.RowFactory<RowT> factory,
- @Nullable BitSet requiredColumns
+ BinaryRow tableRow,
+ RowHandler.RowFactory<RowT> factory
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterFactory.java
similarity index 61%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverter.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterFactory.java
index 2e84a07c8b..e465fef494 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterFactory.java
@@ -18,27 +18,13 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.util.BitSet;
-import org.apache.ignite.internal.schema.BinaryRow;
import org.jetbrains.annotations.Nullable;
/**
- * Converts rows to execution engine representation.
+ * A factory to create a converter from table row to execution engine
representation
+ * with regard to the set of required columns.
*/
-public interface TableRowConverter {
-
- /**
- * Converts a tuple to relational node row.
- *
- * @param ectx Execution context.
- * @param row Tuple to convert.
- * @param requiredColumns Participating columns.
- * @return Relational node row.
- */
- <RowT> RowT toRow(
- ExecutionContext<RowT> ectx,
- BinaryRow row,
- RowHandler.RowFactory<RowT> factory,
- @Nullable BitSet requiredColumns
- );
-
+@FunctionalInterface
+public interface TableRowConverterFactory {
+ TableRowConverter create(@Nullable BitSet requiredColumns);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
index 5c58079614..355a0b42c4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
@@ -18,77 +18,81 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.util.BitSet;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import
org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
+import org.apache.ignite.internal.sql.engine.util.FormatAwareProjectedTuple;
import org.jetbrains.annotations.Nullable;
/**
* Converts rows to execution engine representation.
*/
public class TableRowConverterImpl implements TableRowConverter {
-
private final SchemaRegistry schemaRegistry;
private final SchemaDescriptor schemaDescriptor;
- private final TableDescriptor desc;
-
private final BinaryTupleSchema binaryTupleSchema;
+ private final int[] mapping;
+
/** Constructor. */
- public TableRowConverterImpl(SchemaRegistry schemaRegistry,
SchemaDescriptor schemaDescriptor, TableDescriptor desc) {
+ TableRowConverterImpl(
+ SchemaRegistry schemaRegistry,
+ SchemaDescriptor schemaDescriptor,
+ TableDescriptor descriptor,
+ @Nullable BitSet requiredColumns
+ ) {
this.schemaRegistry = schemaRegistry;
this.schemaDescriptor = schemaDescriptor;
- this.desc = desc;
this.binaryTupleSchema =
BinaryTupleSchema.createRowSchema(schemaDescriptor);
- }
- /** {@inheritDoc} */
- @Override
- public <RowT> RowT toRow(
- ExecutionContext<RowT> ectx,
- BinaryRow binaryRow,
- RowHandler.RowFactory<RowT> factory,
- @Nullable BitSet requiredColumns
- ) {
- Row row = schemaRegistry.resolve(binaryRow, schemaDescriptor);
+ int size = requiredColumns == null ? descriptor.columnsCount() :
requiredColumns.cardinality();
- BinaryTuple tuple = requiredColumns == null
- ? allColumnsTuple(row, binaryTupleSchema)
- : requiredColumnsTuple(row, binaryTupleSchema,
requiredColumns);
+ mapping = new int[size];
- return factory.create(tuple);
- }
-
- private BinaryTuple allColumnsTuple(Row row, BinaryTupleSchema
binarySchema) {
- BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(desc.columnsCount());
-
- for (int i = 0; i < desc.columnsCount(); i++) {
- int index = desc.columnDescriptor(i).physicalIndex();
+ int currentIdx = 0;
+ for (ColumnDescriptor column : descriptor) {
+ if (requiredColumns != null &&
!requiredColumns.get(column.logicalIndex())) {
+ continue;
+ }
- BinaryRowConverter.appendValue(tupleBuilder,
binarySchema.element(index), binarySchema.value(row, index));
+ mapping[currentIdx++] = column.physicalIndex();
}
-
- return new BinaryTuple(tupleBuilder.numElements(),
tupleBuilder.build());
}
- private BinaryTuple requiredColumnsTuple(Row row, BinaryTupleSchema
binarySchema, BitSet requiredColumns) {
- BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(requiredColumns.cardinality());
-
- for (int i = requiredColumns.nextSetBit(0); i != -1; i =
requiredColumns.nextSetBit(i + 1)) {
- int index = desc.columnDescriptor(i).physicalIndex();
-
- BinaryRowConverter.appendValue(tupleBuilder,
binarySchema.element(index), binarySchema.value(row, index));
+ /** {@inheritDoc} */
+ @Override
+ public <RowT> RowT toRow(
+ ExecutionContext<RowT> ectx,
+ BinaryRow tableRow,
+ RowHandler.RowFactory<RowT> factory
+ ) {
+ InternalTuple tuple;
+ if (tableRow.schemaVersion() == schemaDescriptor.version()) {
+ InternalTuple tableTuple = new
BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());
+
+ tuple = new FormatAwareProjectedTuple(
+ tableTuple,
+ mapping
+ );
+ } else {
+ InternalTuple tableTuple = schemaRegistry.resolve(tableRow,
schemaDescriptor);
+
+ tuple = new FieldDeserializingProjectedTuple(
+ binaryTupleSchema,
+ tableTuple,
+ mapping
+ );
}
- return new BinaryTuple(tupleBuilder.numElements(),
tupleBuilder.build());
+ return factory.create(tuple);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index d782948e85..82250927d4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -277,7 +277,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
List<RowT> conflictRows = new
ArrayList<>(binaryRows.size());
for (BinaryRow row : binaryRows) {
- conflictRows.add(rowConverter.toRow(ectx, row,
rowFactory, null));
+ conflictRows.add(rowConverter.toRow(ectx, row,
rowFactory));
}
return conflictRows;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptor.java
index 8cd6d9372f..ff9a855891 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptor.java
@@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable;
* TableDescriptor interface.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-public interface TableDescriptor extends InitializerExpressionFactory {
+public interface TableDescriptor extends InitializerExpressionFactory,
Iterable<ColumnDescriptor> {
/** Returns distribution of the table. */
IgniteDistribution distribution();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
index af2bc9b78b..13abfbf71b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.sql.engine.schema;
import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.native2relationalType;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -115,6 +117,11 @@ public class TableDescriptorImpl extends
NullInitializerExpressionFactory implem
);
}
+ @Override
+ public Iterator<ColumnDescriptor> iterator() {
+ return Arrays.stream(descriptors).iterator();
+ }
+
/** {@inheritDoc} */
@Override
public RelDataType insertRowType(IgniteTypeFactory factory) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractProjectedTuple.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractProjectedTuple.java
new file mode 100644
index 0000000000..dbcf66a3c2
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/AbstractProjectedTuple.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+
+/**
+ * Projected Tuple is a facade that creates projection of the given tuple.
+ *
+ * <p>This particular abstraction provides implementation of {@link
InternalTuple}, leaving
+ * to derived class to implement only rebuilding of the original tuple with
regards to the
+ * provided projection.
+ *
+ * <p>Not thread safe!
+ *
+ * <p>A projection is used to change indexes of column in original tuple, or
to trim
+ * few columns from original tuple. Here are a few examples:<pre>
+ * Having tuple ['foo', 'bar', 'baz'], we can
+ *
+ * - reorder fields with mapping [2, 1, 0] to get equivalent tuple ['baz',
'bar', 'foo']
+ * - or trim certain fields with mapping [0, 2] to get equivalent tuple
['foo', 'baz']
+ * - or even repeat some fields with mapping [0, 0, 0] to get equivalent
tuple ['foo', 'foo', 'foo']
+ * </pre>
+ */
+abstract class AbstractProjectedTuple implements InternalTuple {
+ InternalTuple delegate;
+ int[] projection;
+
+ private boolean normalized = false;
+
+ /**
+ * Constructor.
+ *
+ * @param delegate An original tuple to create projection from.
+ * @param projection A projection. That is, desired order of fields in
original tuple. In that projection, index of the array is
+ * an index of field in resulting projection, and an element of
the array at that index is an index of column in original
+ * tuple.
+ */
+ AbstractProjectedTuple(
+ InternalTuple delegate,
+ int[] projection
+ ) {
+ this.delegate = delegate;
+ this.projection = projection;
+ }
+
+ @Override
+ public int elementCount() {
+ return projection.length;
+ }
+
+ @Override
+ public boolean hasNullValue(int col) {
+ return delegate.hasNullValue(projection[col]);
+ }
+
+ @Override
+ public boolean booleanValue(int col) {
+ return delegate.booleanValue(projection[col]);
+ }
+
+ @Override
+ public Boolean booleanValueBoxed(int col) {
+ return delegate.booleanValueBoxed(projection[col]);
+ }
+
+ @Override
+ public byte byteValue(int col) {
+ return delegate.byteValue(projection[col]);
+ }
+
+ @Override
+ public Byte byteValueBoxed(int col) {
+ return delegate.byteValueBoxed(projection[col]);
+ }
+
+ @Override
+ public short shortValue(int col) {
+ return delegate.shortValue(projection[col]);
+ }
+
+ @Override
+ public Short shortValueBoxed(int col) {
+ return delegate.shortValueBoxed(projection[col]);
+ }
+
+ @Override
+ public int intValue(int col) {
+ return delegate.intValue(projection[col]);
+ }
+
+ @Override
+ public Integer intValueBoxed(int col) {
+ return delegate.intValueBoxed(projection[col]);
+ }
+
+ @Override
+ public long longValue(int col) {
+ return delegate.longValue(projection[col]);
+ }
+
+ @Override
+ public Long longValueBoxed(int col) {
+ return delegate.longValueBoxed(projection[col]);
+ }
+
+ @Override
+ public float floatValue(int col) {
+ return delegate.floatValue(projection[col]);
+ }
+
+ @Override
+ public Float floatValueBoxed(int col) {
+ return delegate.floatValueBoxed(projection[col]);
+ }
+
+ @Override
+ public double doubleValue(int col) {
+ return delegate.doubleValue(projection[col]);
+ }
+
+ @Override
+ public Double doubleValueBoxed(int col) {
+ return delegate.doubleValueBoxed(projection[col]);
+ }
+
+ @Override
+ public BigDecimal decimalValue(int col, int decimalScale) {
+ return delegate.decimalValue(projection[col], decimalScale);
+ }
+
+ @Override
+ public BigInteger numberValue(int col) {
+ return delegate.numberValue(projection[col]);
+ }
+
+ @Override
+ public String stringValue(int col) {
+ return delegate.stringValue(projection[col]);
+ }
+
+ @Override
+ public byte[] bytesValue(int col) {
+ return delegate.bytesValue(projection[col]);
+ }
+
+ @Override
+ public UUID uuidValue(int col) {
+ return delegate.uuidValue(projection[col]);
+ }
+
+ @Override
+ public BitSet bitmaskValue(int col) {
+ return delegate.bitmaskValue(projection[col]);
+ }
+
+ @Override
+ public LocalDate dateValue(int col) {
+ return delegate.dateValue(projection[col]);
+ }
+
+ @Override
+ public LocalTime timeValue(int col) {
+ return delegate.timeValue(projection[col]);
+ }
+
+ @Override
+ public LocalDateTime dateTimeValue(int col) {
+ return delegate.dateTimeValue(projection[col]);
+ }
+
+ @Override
+ public Instant timestampValue(int col) {
+ return delegate.timestampValue(projection[col]);
+ }
+
+ @Override
+ public ByteBuffer byteBuffer() {
+ normalizeIfNeeded();
+
+ return delegate.byteBuffer();
+ }
+
+ /**
+ * Rebuild an original tuple with respect to the given projection.
+ *
+ * <p>It's guaranteed that this method will be called at most once, thus
no additional checks are required.
+ *
+ * <p>It's supposed that implementations of this method will replace
{@link #delegate} and {@link #projection}
+ * with normalized ones.
+ */
+ protected abstract void normalize();
+
+ private void normalizeIfNeeded() {
+ if (normalized) {
+ return;
+ }
+
+ normalize();
+
+ normalized = true;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/FieldDeserializingProjectedTuple.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/FieldDeserializingProjectedTuple.java
new file mode 100644
index 0000000000..926fc89b47
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/FieldDeserializingProjectedTuple.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+
+/**
+ * A projected tuple that doesn't require delegate to be in particular format.
+ *
+ * <p>During normalization, the original tuple will be read field by field
with regard to provided projection.
+ * Such an approach had an additional overhead on (de-)serialization fields
value, but had no requirement for the tuple
+ * to be compatible with Binary Tuple format.
+ *
+ * <p>Not thread safe!
+ *
+ * @see AbstractProjectedTuple
+ */
+public class FieldDeserializingProjectedTuple extends AbstractProjectedTuple {
+ private final BinaryTupleSchema schema;
+
+ /**
+ * Constructor.
+ *
+ * @param schema A schema of the original tuple (represented by delegate).
Used to read content of the delegate to build a
+ * proper byte buffer which content satisfying the schema with
regard to given projection.
+ * @param delegate An original tuple to create projection from.
+ * @param projection A projection. That is, desired order of fields in
original tuple. In that projection, index of the array is
+ * an index of field in resulting projection, and an element of
the array at that index is an index of column in original
+ * tuple.
+ */
+ public FieldDeserializingProjectedTuple(BinaryTupleSchema schema,
InternalTuple delegate, int[] projection) {
+ super(delegate, projection);
+
+ this.schema = schema;
+ }
+
+ @Override
+ protected void normalize() {
+ var builder = new BinaryTupleBuilder(projection.length);
+ var newProjection = new int[projection.length];
+
+ for (int i = 0; i < projection.length; i++) {
+ int col = projection[i];
+
+ newProjection[i] = i;
+
+ Element element = schema.element(col);
+
+ BinaryRowConverter.appendValue(builder, element,
schema.value(delegate, col));
+ }
+
+ delegate = new BinaryTuple(projection.length, builder.build());
+ projection = newProjection;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/FormatAwareProjectedTuple.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/FormatAwareProjectedTuple.java
new file mode 100644
index 0000000000..5ee256b9c9
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/FormatAwareProjectedTuple.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleParser;
+import org.apache.ignite.internal.binarytuple.BinaryTupleParser.Sink;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+
+/**
+ * A projected tuple that aware of the format of delegate.
+ *
+ * <p>That is, the format of delegate is known to be Binary Tuple, thus it's
possible to avoid unnecessary
+ * (de-)serialization during tuple normalization.
+ *
+ * <p>It's up to the caller to get sure that provided tuple respect the format.
+ *
+ * <p>Not thread safe!
+ *
+ * @see AbstractProjectedTuple
+ */
+public class FormatAwareProjectedTuple extends AbstractProjectedTuple {
+ /**
+ * Constructor.
+ *
+ * @param delegate An original tuple to create projection from.
+ * @param projection A projection. That is, desired order of fields in
original tuple. In that projection, index of the array is
+ * an index of field in resulting projection, and an element of
the array at that index is an index of column in original
+ * tuple.
+ */
+ public FormatAwareProjectedTuple(InternalTuple delegate, int[] projection)
{
+ super(delegate, projection);
+ }
+
+ @Override
+ protected void normalize() {
+ int[] newProjection = new int[projection.length];
+ ByteBuffer tupleBuffer = delegate.byteBuffer();
+
+ BinaryTupleParser parser = new
BinaryTupleParser(delegate.elementCount(), tupleBuffer);
+
+ // Estimate total data size.
+ var stats = new Sink() {
+ int estimatedValueSize = 0;
+
+ @Override
+ public void nextElement(int index, int begin, int end) {
+ estimatedValueSize += end - begin;
+ }
+ };
+
+ for (int columnIndex : projection) {
+ parser.fetch(columnIndex, stats);
+ }
+
+ // Now compose the tuple.
+ BinaryTupleBuilder builder = new BinaryTupleBuilder(projection.length,
stats.estimatedValueSize);
+
+ for (int i = 0; i < projection.length; i++) {
+ int columnIndex = projection[i];
+
+ parser.fetch(columnIndex, (index, begin, end) -> {
+ if (begin == end) {
+ builder.appendNull();
+ } else {
+ builder.appendElementBytes(tupleBuffer, begin, end -
begin);
+ }
+ });
+
+ newProjection[i] = columnIndex;
+ }
+
+ delegate = new BinaryTuple(projection.length, builder.build());
+ projection = newProjection;
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
index eceb2519e7..6d99328dce 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
@@ -150,6 +152,7 @@ public class ExecutableTableRegistrySelfTest extends
BaseIgniteAbstractTest {
when(tableManager.tableAsync(tableId)).thenReturn(CompletableFuture.completedFuture(table));
when(schemaManager.schemaRegistry(tableId)).thenReturn(schemaRegistry);
when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
+
when(descriptor.iterator()).thenReturn(List.<ColumnDescriptor>of().iterator());
return registry.getTable(tableId, descriptor);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index c9ab8ffe5e..79b6f14149 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -34,13 +34,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
-import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Publisher;
@@ -586,7 +584,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
this.input = input;
rowConverter = new RowCollectingTableRwoConverter(input);
tableDescriptor = new
TestTableDescriptor(IgniteDistributions::single, input.rowType);
- scannableTable = new ScannableTableImpl(internalTable,
rowConverter, tableDescriptor);
+ scannableTable = new ScannableTableImpl(internalTable, rf ->
rowConverter, tableDescriptor);
}
ResultCollector tableScan(int partitionId, long term, NoOpTransaction
tx) {
@@ -606,7 +604,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Publisher<Object[]> publisher = scannableTable.scan(ctx, new
PartitionWithTerm(partitionId, term), rowFactory, null);
- return new ResultCollector(publisher, requiredFields,
rowConverter);
+ return new ResultCollector(publisher, rowConverter);
}
ResultCollector indexScan(int partitionId, long term, NoOpTransaction
tx,
@@ -645,7 +643,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Publisher<Object[]> publisher = scannableTable.indexRangeScan(ctx,
new PartitionWithTerm(partitionId, term), rowFactory,
indexId, indexColumns, rangeCondition, requiredFields);
- return new ResultCollector(publisher, requiredFields,
rowConverter);
+ return new ResultCollector(publisher, rowConverter);
}
ResultCollector indexLookUp(int partitionId, long term,
NoOpTransaction tx,
@@ -679,7 +677,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Publisher<Object[]> publisher = scannableTable.indexLookup(ctx,
new PartitionWithTerm(partitionId, term), rowFactory,
indexId, indexColumns, key, requiredFields);
- return new ResultCollector(publisher, requiredFields,
rowConverter);
+ return new ResultCollector(publisher, rowConverter);
}
}
@@ -764,20 +762,20 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
final TestInput testInput;
- final List<Map.Entry<BinaryRow, BitSet>> converted = new ArrayList<>();
+ final List<BinaryRow> converted = new ArrayList<>();
RowCollectingTableRwoConverter(TestInput testData) {
this.testInput = testData;
}
@Override
- public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow row,
RowFactory<RowT> factory, @Nullable BitSet requiredColumns) {
- Object[] convertedRow = testInput.data.get(row);
+ public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow
tableRow, RowFactory<RowT> factory) {
+ Object[] convertedRow = testInput.data.get(tableRow);
if (convertedRow == null) {
- throw new IllegalArgumentException("Unexpected row: " + row);
+ throw new IllegalArgumentException("Unexpected row: " +
tableRow);
}
- converted.add(new SimpleEntry<>(row, requiredColumns));
+ converted.add(tableRow);
return (RowT) convertedRow;
}
}
@@ -792,12 +790,9 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
final RowCollectingTableRwoConverter rowConverter;
- final BitSet requiredFields;
-
- ResultCollector(Publisher<?> input, BitSet requiredFields,
RowCollectingTableRwoConverter rowConverter) {
+ ResultCollector(Publisher<?> input, RowCollectingTableRwoConverter
rowConverter) {
this.input = input;
this.rowConverter = rowConverter;
- this.requiredFields = requiredFields;
input.subscribe(new Subscriber<Object>() {
@Override
@@ -842,10 +837,10 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
}
void expectRow(BinaryRow row) {
- List<Entry<BinaryRow, BitSet>> result = rowConverter.converted;
+ List<BinaryRow> result = rowConverter.converted;
- if (!result.contains(new SimpleEntry<>(row, requiredFields))) {
- fail(format("Unexpected binary row/required fields: {}/{}.
Converted: {}", row, requiredFields, result));
+ if (!result.contains(row)) {
+ fail(format("Unexpected binary row: {}. Converted: {}", row,
result));
}
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 11a214ff6a..eb47674b0c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -108,13 +108,12 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
TableRowConverter rowConverter = new TableRowConverter() {
@Override
- public <RowT> RowT toRow(ExecutionContext<RowT> ectx,
BinaryRow row, RowFactory<RowT> factory,
- @Nullable BitSet requiredColumns) {
+ public <RowT> RowT toRow(ExecutionContext<RowT> ectx,
BinaryRow tableRow, RowFactory<RowT> factory) {
return (RowT) TestInternalTableImpl.ROW;
}
};
TableDescriptor descriptor = new
TestTableDescriptor(IgniteDistributions::single, rowType);
- ScannableTableImpl scanableTable = new
ScannableTableImpl(internalTable, rowConverter, descriptor);
+ ScannableTableImpl scanableTable = new
ScannableTableImpl(internalTable, rf -> rowConverter, descriptor);
TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, scanableTable,
partsWithTerms, null, null, null);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 7df6627236..b90e18e0c0 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -35,6 +35,7 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -741,6 +742,11 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
this.rowType = rowType;
}
+ @Override
+ public Iterator<ColumnDescriptor> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override
public IgniteDistribution distribution() {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/ProjectedTupleTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/ProjectedTupleTest.java
new file mode 100644
index 0000000000..99bd0e2a6a
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/ProjectedTupleTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaTestUtils;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** Tests to verify projected tuple facade. */
+class ProjectedTupleTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ProjectedTupleTest.class);
+
+ private static final BinaryTupleSchema ALL_TYPES_SCHEMA =
BinaryTupleSchema.create(
+ SchemaTestUtils.ALL_TYPES.stream()
+ .map(type -> new Element(type, true))
+ .toArray(Element[]::new)
+ );
+
+ private static BinaryTuple TUPLE;
+ private static Random RND;
+
+ @BeforeAll
+ static void initRandom() {
+ int seed = ThreadLocalRandom.current().nextInt();
+
+ RND = new Random(seed);
+
+ LOG.info("Seed is " + seed);
+
+ var builder = new BinaryTupleBuilder(ALL_TYPES_SCHEMA.elementCount());
+
+ for (int i = 0; i < ALL_TYPES_SCHEMA.elementCount(); i++) {
+ Element e = ALL_TYPES_SCHEMA.element(i);
+
+ BinaryRowConverter.appendValue(builder, e,
SchemaTestUtils.generateRandomValue(RND, fromElement(e)));
+ }
+
+ TUPLE = new BinaryTuple(ALL_TYPES_SCHEMA.elementCount(),
builder.build());
+ }
+
+ @Test
+ void allTypesAreCovered() {
+ List<NativeTypeSpec> coveredTypes = IntStream.range(0,
ALL_TYPES_SCHEMA.elementCount())
+ .mapToObj(ALL_TYPES_SCHEMA::element)
+ .map(Element::typeSpec)
+ .collect(Collectors.toList());
+
+ EnumSet<NativeTypeSpec> allTypes = EnumSet.allOf(NativeTypeSpec.class);
+
+ coveredTypes.forEach(allTypes::remove);
+
+ assertThat(allTypes, empty());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 3})
+ void projectionReturnsProperElementCount(int projectionSize) {
+ InternalTuple projection1 = new FieldDeserializingProjectedTuple(
+ ALL_TYPES_SCHEMA, TUPLE, new int[projectionSize]
+ );
+ InternalTuple projection2 = new FormatAwareProjectedTuple(
+ TUPLE, new int[projectionSize]
+ );
+
+ assertThat(projection1.elementCount(), equalTo(projectionSize));
+ assertThat(projection2.elementCount(), equalTo(projectionSize));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testProjection(boolean useOptimizeProjection) {
+ int f1 = RND.nextInt(ALL_TYPES_SCHEMA.elementCount());
+ int f2 = RND.nextInt(ALL_TYPES_SCHEMA.elementCount());
+ int f3 = RND.nextInt(ALL_TYPES_SCHEMA.elementCount());
+
+ int[] projection = {f1, f2, f3};
+
+ InternalTuple projectedTuple = useOptimizeProjection
+ ? new FormatAwareProjectedTuple(TUPLE, projection)
+ : new FieldDeserializingProjectedTuple(ALL_TYPES_SCHEMA,
TUPLE, projection);
+
+ Element e1 = ALL_TYPES_SCHEMA.element(f1);
+ Element e2 = ALL_TYPES_SCHEMA.element(f2);
+ Element e3 = ALL_TYPES_SCHEMA.element(f3);
+
+ BinaryTupleSchema projectedSchema = BinaryTupleSchema.create(new
Element[] {
+ e1, e2, e3
+ });
+
+ assertThat(projectedSchema.value(projectedTuple, 0),
equalTo(ALL_TYPES_SCHEMA.value(TUPLE, f1)));
+ assertThat(projectedSchema.value(projectedTuple, 1),
equalTo(ALL_TYPES_SCHEMA.value(TUPLE, f2)));
+ assertThat(projectedSchema.value(projectedTuple, 2),
equalTo(ALL_TYPES_SCHEMA.value(TUPLE, f3)));
+
+ InternalTuple restored = new BinaryTuple(projection.length,
projectedTuple.byteBuffer());
+
+ assertThat(projectedSchema.value(restored, 0),
equalTo(ALL_TYPES_SCHEMA.value(TUPLE, f1)));
+ assertThat(projectedSchema.value(restored, 1),
equalTo(ALL_TYPES_SCHEMA.value(TUPLE, f2)));
+ assertThat(projectedSchema.value(restored, 2),
equalTo(ALL_TYPES_SCHEMA.value(TUPLE, f3)));
+ }
+
+ private static NativeType fromElement(Element element) {
+ switch (element.typeSpec()) {
+ case BOOLEAN:
+ return NativeTypes.BOOLEAN;
+ case INT8:
+ return NativeTypes.INT8;
+ case INT16:
+ return NativeTypes.INT16;
+ case INT32:
+ return NativeTypes.INT32;
+ case INT64:
+ return NativeTypes.INT64;
+ case FLOAT:
+ return NativeTypes.FLOAT;
+ case DOUBLE:
+ return NativeTypes.DOUBLE;
+ case DECIMAL:
+ return NativeTypes.decimalOf(20, element.decimalScale());
+ case NUMBER:
+ return NativeTypes.numberOf(20);
+ case DATE:
+ return NativeTypes.DATE;
+ case TIME:
+ return NativeTypes.time();
+ case DATETIME:
+ return NativeTypes.datetime();
+ case TIMESTAMP:
+ return NativeTypes.timestamp();
+ case UUID:
+ return NativeTypes.UUID;
+ case BITMASK:
+ return NativeTypes.bitmaskOf(256);
+ case STRING:
+ return NativeTypes.stringOf(256);
+ case BYTES:
+ return NativeTypes.blobOf(256);
+ default:
+ throw new IllegalArgumentException("Unknown type: " +
element.typeSpec());
+ }
+ }
+}