This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6df2cc270a IGNITE-19791 Sql. Introduce SqlRowHandler that supports
binary tuple rows (#2512)
6df2cc270a is described below
commit 6df2cc270ab8c870b547303b90fb60ee109ac04e
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Sep 7 15:23:40 2023 +0300
IGNITE-19791 Sql. Introduce SqlRowHandler that supports binary tuple rows
(#2512)
---
.../internal/sql/engine/ItSqlOperatorsTest.java | 7 +-
.../ignite/internal/schema/row/InternalTuple.java | 16 +
.../internal/sql/engine/SqlQueryProcessor.java | 4 +-
.../internal/sql/engine/exec/RowHandler.java | 24 ++
.../internal/sql/engine/exec/SqlRowHandler.java | 411 +++++++++++++++++++++
.../sql/engine/exec/TableRowConverterImpl.java | 48 ++-
.../sql/engine/exec/exp/ExpressionFactoryImpl.java | 13 +-
.../sql/engine/exec/exp/agg/Accumulators.java | 65 +++-
.../internal/sql/engine/exec/rel/ModifyNode.java | 23 +-
.../sql/engine/exec/row/RowSchemaTypes.java | 24 ++
.../ignite/internal/sql/engine/util/TypeUtils.java | 2 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 1 +
.../exec/IdentityDistributionFunctionSelfTest.java | 18 +-
.../sql/engine/exec/RuntimeSortedIndexTest.java | 1 +
.../sql/engine/exec/rel/AbstractExecutionTest.java | 8 +-
.../engine/exec/rel/MergeJoinExecutionTest.java | 2 +-
.../engine/exec/rel/ScannableTableSelfTest.java | 2 +-
.../sql/engine/exec/row/SqlRowHandlerTest.java | 254 +++++++++++++
.../sql/engine/framework}/ArrayRowHandler.java | 22 +-
.../sql/engine/framework/TestBuilders.java | 1 -
.../internal/sql/engine/framework/TestNode.java | 1 -
.../sql/engine/util/HashFunctionsTest.java | 2 +-
.../internal/sql/engine/util/SqlTestUtils.java | 3 +-
23 files changed, 874 insertions(+), 78 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
index 3c5f6b0596..b4b03f8c6c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine;
-import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.Period;
@@ -193,13 +192,15 @@ public class ItSqlOperatorsTest extends
ClusterPerClassIntegrationTest {
assertExpression("COT(1)").returns(1.0d / Math.tan(1)).check();
assertExpression("DEGREES(1)").returns(Math.toDegrees(1)).check();
assertExpression("RADIANS(1)").returns(Math.toRadians(1)).check();
- assertExpression("ROUND(1.7)").returns(BigDecimal.valueOf(2)).check();
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20311
+ //
assertExpression("ROUND(1.7)").returns(BigDecimal.valueOf(2)).check();
assertExpression("SIGN(-5)").returns(-1).check();
assertExpression("SIN(1)").returns(Math.sin(1)).check();
assertExpression("SINH(1)").returns(Math.sinh(1)).check();
assertExpression("TAN(1)").returns(Math.tan(1)).check();
assertExpression("TANH(1)").returns(Math.tanh(1)).check();
-
assertExpression("TRUNCATE(1.7)").returns(BigDecimal.valueOf(1)).check();
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20311
+ //
assertExpression("TRUNCATE(1.7)").returns(BigDecimal.valueOf(1)).check();
assertExpression("PI").returns(Math.PI).check();
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
index 642f5ccdd2..c3e242e7bc 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
@@ -46,6 +46,22 @@ public interface InternalTuple {
*/
boolean hasNullValue(int col);
+ /**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ */
+ boolean booleanValue(int col);
+
+ /**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ */
+ Boolean booleanValueBoxed(int col);
+
/**
* Reads value for specified column.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index a6b496c6ad..8c7bc44509 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistryImpl;
import
org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolverImpl;
@@ -64,6 +63,7 @@ import
org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryValidationException;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
@@ -282,7 +282,7 @@ public class SqlQueryProcessor implements QueryProcessor {
sqlSchemaManager,
ddlCommandHandler,
taskExecutor,
- ArrayRowHandler.INSTANCE,
+ SqlRowHandler.INSTANCE,
mailboxRegistry,
exchangeService,
dependencyResolver
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
index 28ccfafe8b..406b0dc7aa 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.row.InternalTuple;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.jetbrains.annotations.Nullable;
@@ -44,6 +45,21 @@ public interface RowHandler<RowT> {
/** Concatenate two rows. */
RowT concat(RowT left, RowT right);
+ /**
+ * Creates a new row containing only the fields specified in the provided
mapping.
+ *
+ * <p>For example:
+ * <pre>
+ * source row [5, 6, 7, 8] apply mapping [1, 3]
+ * result row will be [6, 8]
+ * </pre>
+ *
+ * @param row Source row.
+ * @param mapping Target field indexes.
+ * @return A new row with fields from the specified mapping.
+ */
+ RowT map(RowT row, int[] mapping);
+
/** Return column count contained in the incoming row. */
int columnCount(RowT row);
@@ -87,5 +103,13 @@ public interface RowHandler<RowT> {
* @return Instantiation defined representation.
*/
RowT create(ByteBuffer raw);
+
+ /**
+ * Create row using incoming binary tuple.
+ *
+ * @param tuple {@link InternalTuple} representation.
+ * @return Instantiation defined representation.
+ */
+ RowT create(InternalTuple tuple);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
new file mode 100644
index 0000000000..3ff3d5fcf9
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static
org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl.UNSPECIFIED_VALUE_PLACEHOLDER;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.DecimalNativeType;
+import org.apache.ignite.internal.schema.InvalidTypeException;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema.Builder;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchemaTypes;
+import org.apache.ignite.internal.sql.engine.exec.row.TypeSpec;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.lang.IgniteStringBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handler that uses a {@link RowWrapper wrapper} to operate with two types of
row implementations.
+ *
+ * <ul>
+ * <li>{@link ObjectsArrayRowWrapper Objects array}</li>
+ * <li>{@link BinaryTupleRowWrapper Binary tuple}</li>
+ * </ul>
+ *
+ * <p>Each kind of rows is serialized to the same binary tuple format
+ * using the {@link #toByteBuffer(RowWrapper) toByteBuffer} method.
+ *
+ * <p>Factory methods {@link RowFactory#create(InternalTuple)
wrap(InternalTuple)} and
+ * {@link RowFactory#create(ByteBuffer) create(ByteBuffer)} allow create rows
without
+ * any additional conversions. But the fields in binary tuple must match the
+ * factory {@link RowSchema row schema}.
+ */
+public class SqlRowHandler implements RowHandler<RowWrapper> {
+ public static final RowHandler<RowWrapper> INSTANCE = new SqlRowHandler();
+
+ private SqlRowHandler() {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable Object get(int field, RowWrapper row) {
+ return row.get(field);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void set(int field, RowWrapper row, @Nullable Object val) {
+ row.set(field, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RowWrapper concat(RowWrapper left, RowWrapper right) {
+ int leftLen = left.columnsCount();
+ int rightLen = right.columnsCount();
+ List<TypeSpec> leftTypes = left.rowSchema().fields();
+ List<TypeSpec> rightTypes = right.rowSchema().fields();
+
+ Object[] values = new Object[leftLen + rightLen];
+ Builder schemaBuilder = RowSchema.builder();
+
+ for (int i = 0; i < leftLen; i++) {
+ values[i] = left.get(i);
+ schemaBuilder.addField(leftTypes.get(i));
+ }
+
+ for (int i = 0; i < rightLen; i++) {
+ values[leftLen + i] = right.get(i);
+ schemaBuilder.addField(rightTypes.get(i));
+ }
+
+ return new ObjectsArrayRowWrapper(schemaBuilder.build(), values);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RowWrapper map(RowWrapper row, int[] mapping) {
+ Object[] fields = new Object[mapping.length];
+
+ for (int i = 0; i < mapping.length; i++) {
+ fields[i] = row.get(mapping[i]);
+ }
+
+ return new ObjectsArrayRowWrapper(row.rowSchema(), fields);
+ }
+
+ @Override
+ public int columnCount(RowWrapper row) {
+ return row.columnsCount();
+ }
+
+ @Override
+ public String toString(RowWrapper row) {
+ IgniteStringBuilder buf = new IgniteStringBuilder("Row[");
+ int maxIdx = columnCount(row) - 1;
+
+ for (int i = 0; i <= maxIdx; i++) {
+ buf.app(row.get(i));
+
+ if (i != maxIdx) {
+ buf.app(", ");
+ }
+ }
+
+ return buf.app(']').toString();
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer(RowWrapper row) {
+ return row.toByteBuffer();
+ }
+
+ @Override
+ public RowFactory<RowWrapper> factory(RowSchema rowSchema) {
+ int schemaLen = rowSchema.fields().size();
+
+ return new RowFactory<>() {
+ /** {@inheritDoc} */
+ @Override
+ public RowHandler<RowWrapper> handler() {
+ return SqlRowHandler.this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RowWrapper create() {
+ return create(new Object[schemaLen]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RowWrapper create(Object... fields) {
+ assert fields.length == schemaLen;
+
+ return new ObjectsArrayRowWrapper(rowSchema, fields);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RowWrapper create(ByteBuffer buf) {
+ return create(new BinaryTuple(schemaLen, buf));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RowWrapper create(InternalTuple tuple) {
+ assert schemaLen == tuple.elementCount();
+
+ return new BinaryTupleRowWrapper(rowSchema, tuple);
+ }
+ };
+ }
+
+ /**
+ * Provides the ability for a single {@link RowHandler} instance to
interact with multiple row implementations.
+ */
+ public abstract static class RowWrapper {
+ abstract int columnsCount();
+
+ abstract RowSchema rowSchema();
+
+ abstract @Nullable Object get(int field);
+
+ abstract void set(int field, Object value);
+
+ abstract ByteBuffer toByteBuffer();
+ }
+
+ /**
+ * Wrapper over an array of objects.
+ */
+ private static class ObjectsArrayRowWrapper extends RowWrapper {
+ private final RowSchema rowSchema;
+ private final Object[] row;
+
+ ObjectsArrayRowWrapper(RowSchema rowSchema, Object[] row) {
+ this.rowSchema = rowSchema;
+ this.row = row;
+ }
+
+ @Override
+ int columnsCount() {
+ return row.length;
+ }
+
+ @Override
+ @Nullable Object get(int field) {
+ return row[field];
+ }
+
+ @Override
+ void set(int field, @Nullable Object value) {
+ row[field] = value;
+ }
+
+ @Override
+ ByteBuffer toByteBuffer() {
+ BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(row.length);
+
+ for (int i = 0; i < row.length; i++) {
+ Object value = row[i];
+
+ assert value != UNSPECIFIED_VALUE_PLACEHOLDER : "Invalid row
value.";
+
+ appendValue(tupleBuilder, rowSchema.fields().get(i), value);
+ }
+
+ return tupleBuilder.build();
+ }
+
+ @Override
+ RowSchema rowSchema() {
+ return rowSchema;
+ }
+
+ private void appendValue(BinaryTupleBuilder builder, TypeSpec
schemaType, @Nullable Object value) {
+ if (value == null) {
+ builder.appendNull();
+
+ return;
+ }
+
+ NativeType nativeType = RowSchemaTypes.toNativeType(schemaType);
+
+ value = TypeUtils.fromInternal(value,
NativeTypeSpec.toClass(nativeType.spec(), schemaType.isNullable()));
+
+ assert value != null : nativeType;
+
+ switch (nativeType.spec()) {
+ case BOOLEAN:
+ builder.appendBoolean((boolean) value);
+ break;
+
+ case INT8:
+ builder.appendByte((byte) value);
+ break;
+
+ case INT16:
+ builder.appendShort((short) value);
+ break;
+
+ case INT32:
+ builder.appendInt((int) value);
+ break;
+
+ case INT64:
+ builder.appendLong((long) value);
+ break;
+
+ case FLOAT:
+ builder.appendFloat((float) value);
+ break;
+
+ case DOUBLE:
+ builder.appendDouble((double) value);
+ break;
+
+ case NUMBER:
+ builder.appendNumberNotNull((BigInteger) value);
+ break;
+
+ case DECIMAL:
+ builder.appendDecimalNotNull((BigDecimal) value,
((DecimalNativeType) nativeType).scale());
+ break;
+
+ case UUID:
+ builder.appendUuidNotNull((UUID) value);
+ break;
+
+ case BYTES:
+ builder.appendBytesNotNull((byte[]) value);
+ break;
+
+ case STRING:
+ builder.appendStringNotNull((String) value);
+ break;
+
+ case BITMASK:
+ builder.appendBitmaskNotNull((BitSet) value);
+ break;
+
+ case DATE:
+ builder.appendDateNotNull((LocalDate) value);
+ break;
+
+ case TIME:
+ builder.appendTimeNotNull((LocalTime) value);
+ break;
+
+ case DATETIME:
+ builder.appendDateTimeNotNull((LocalDateTime) value);
+ break;
+
+ case TIMESTAMP:
+ builder.appendTimestampNotNull((Instant) value);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unknown type " +
nativeType);
+ }
+ }
+ }
+
+ /**
+ * Wrapper over an {@link BinaryTuple}.
+ *
+ * <p>Since {@link BinaryTuple binary tuple} is immutable this wrapper
doesn't support {@link #set(int, Object)} operation.
+ */
+ private static class BinaryTupleRowWrapper extends RowWrapper {
+ private final RowSchema rowSchema;
+ private final InternalTuple tuple;
+
+ BinaryTupleRowWrapper(RowSchema rowSchema, InternalTuple tuple) {
+ this.rowSchema = rowSchema;
+ this.tuple = tuple;
+ }
+
+ @Override
+ int columnsCount() {
+ return tuple.elementCount();
+ }
+
+ @Override
+ @Nullable Object get(int field) {
+ NativeType nativeType =
RowSchemaTypes.toNativeType(rowSchema.fields().get(field));
+
+ if (nativeType == null) {
+ return null;
+ }
+
+ Object value = readValue(tuple, nativeType, field);
+
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.toInternal(value,
Commons.nativeTypeToClass(nativeType));
+ }
+
+ @Override
+ void set(int field, @Nullable Object value) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20356
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ ByteBuffer toByteBuffer() {
+ return tuple.byteBuffer();
+ }
+
+ @Override
+ RowSchema rowSchema() {
+ return rowSchema;
+ }
+
+ private @Nullable Object readValue(InternalTuple tuple, NativeType
nativeType, int fieldIndex) {
+ switch (nativeType.spec()) {
+ case BOOLEAN: return tuple.booleanValueBoxed(fieldIndex);
+ case INT8: return tuple.byteValueBoxed(fieldIndex);
+ case INT16: return tuple.shortValueBoxed(fieldIndex);
+ case INT32: return tuple.intValueBoxed(fieldIndex);
+ case INT64: return tuple.longValueBoxed(fieldIndex);
+ case FLOAT: return tuple.floatValueBoxed(fieldIndex);
+ case DOUBLE: return tuple.doubleValueBoxed(fieldIndex);
+ case DECIMAL: return tuple.decimalValue(fieldIndex,
((DecimalNativeType) nativeType).scale());
+ case UUID: return tuple.uuidValue(fieldIndex);
+ case STRING: return tuple.stringValue(fieldIndex);
+ case BYTES: return tuple.bytesValue(fieldIndex);
+ case BITMASK: return tuple.bitmaskValue(fieldIndex);
+ case NUMBER: return tuple.numberValue(fieldIndex);
+ case DATE: return tuple.dateValue(fieldIndex);
+ case TIME: return tuple.timeValue(fieldIndex);
+ case DATETIME: return tuple.dateTimeValue(fieldIndex);
+ case TIMESTAMP: return tuple.timestampValue(fieldIndex);
+ default: throw new InvalidTypeException("Unknown element type:
" + nativeType);
+ }
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
index dc81c12c76..5c58079614 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
@@ -18,13 +18,15 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.util.BitSet;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -38,11 +40,15 @@ public class TableRowConverterImpl implements
TableRowConverter {
private final TableDescriptor desc;
+ private final BinaryTupleSchema binaryTupleSchema;
+
/** Constructor. */
public TableRowConverterImpl(SchemaRegistry schemaRegistry,
SchemaDescriptor schemaDescriptor, TableDescriptor desc) {
this.schemaRegistry = schemaRegistry;
this.schemaDescriptor = schemaDescriptor;
this.desc = desc;
+
+ this.binaryTupleSchema =
BinaryTupleSchema.createRowSchema(schemaDescriptor);
}
/** {@inheritDoc} */
@@ -53,30 +59,36 @@ public class TableRowConverterImpl implements
TableRowConverter {
RowHandler.RowFactory<RowT> factory,
@Nullable BitSet requiredColumns
) {
- RowHandler<RowT> handler = factory.handler();
+ Row row = schemaRegistry.resolve(binaryRow, schemaDescriptor);
- assert handler == ectx.rowHandler();
+ BinaryTuple tuple = requiredColumns == null
+ ? allColumnsTuple(row, binaryTupleSchema)
+ : requiredColumnsTuple(row, binaryTupleSchema,
requiredColumns);
- RowT res = factory.create();
+ return factory.create(tuple);
+ }
- assert handler.columnCount(res) == (requiredColumns == null ?
desc.columnsCount() : requiredColumns.cardinality());
+ private BinaryTuple allColumnsTuple(Row row, BinaryTupleSchema
binarySchema) {
+ BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(desc.columnsCount());
- Row row = schemaRegistry.resolve(binaryRow, schemaDescriptor);
+ for (int i = 0; i < desc.columnsCount(); i++) {
+ int index = desc.columnDescriptor(i).physicalIndex();
+
+ BinaryRowConverter.appendValue(tupleBuilder,
binarySchema.element(index), binarySchema.value(row, index));
+ }
+
+ return new BinaryTuple(tupleBuilder.numElements(),
tupleBuilder.build());
+ }
- if (requiredColumns == null) {
- for (int i = 0; i < desc.columnsCount(); i++) {
- ColumnDescriptor colDesc = desc.columnDescriptor(i);
+ private BinaryTuple requiredColumnsTuple(Row row, BinaryTupleSchema
binarySchema, BitSet requiredColumns) {
+ BinaryTupleBuilder tupleBuilder = new
BinaryTupleBuilder(requiredColumns.cardinality());
- handler.set(i, res,
TypeUtils.toInternal(row.value(colDesc.physicalIndex())));
- }
- } else {
- for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j =
requiredColumns.nextSetBit(j + 1), i++) {
- ColumnDescriptor colDesc = desc.columnDescriptor(j);
+ for (int i = requiredColumns.nextSetBit(0); i != -1; i =
requiredColumns.nextSetBit(i + 1)) {
+ int index = desc.columnDescriptor(i).physicalIndex();
- handler.set(i, res,
TypeUtils.toInternal(row.value(colDesc.physicalIndex())));
- }
+ BinaryRowConverter.appendValue(tupleBuilder,
binarySchema.element(index), binarySchema.value(row, index));
}
- return res;
+ return new BinaryTuple(tupleBuilder.numElements(),
tupleBuilder.build());
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
index ed38d522c3..700e102f68 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
@@ -277,8 +277,17 @@ public class ExpressionFactoryImpl<RowT> implements
ExpressionFactory<RowT> {
/** {@inheritDoc} */
@Override
public <T> Supplier<T> execute(RexNode node) {
- RelDataType nodeType = node.getType();
- RowSchema rowSchema =
TypeUtils.rowSchemaFromRelTypes(List.of(nodeType));
+ RelDataType exprType = node.getType();
+ List<RelDataType> typesList;
+
+ if (exprType.getSqlTypeName() == SqlTypeName.ROW) {
+ // Convert a row returned from a table function into a list of
columns.
+ typesList = RelOptUtil.getFieldTypeList(exprType);
+ } else {
+ typesList = List.of(exprType);
+ }
+
+ RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(typesList);
RowFactory<RowT> factory = ctx.rowHandler().factory(rowSchema);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
index 458cf56dfc..2ff04c5950 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
@@ -88,11 +88,11 @@ public class Accumulators {
case "SOME":
return minMaxFactory(false, call);
case "SINGLE_VALUE":
- return SingleVal.FACTORY;
- case "LITERAL_AGG":
- return LiteralVal.FACTORY;
+ return singleValueFactory(call);
case "ANY_VALUE":
- return AnyVal.FACTORY;
+ return anyValueFactory(call);
+ case "LITERAL_AGG":
+ return
LiteralVal.newAccumulator(typeFactory.createSqlType(BOOLEAN));
default:
throw new AssertionError(call.getAggregation().getName());
}
@@ -186,6 +186,26 @@ public class Accumulators {
}
}
+ private Supplier<Accumulator> singleValueFactory(AggregateCall call) {
+ RelDataType type = call.getType();
+
+ if (type.getSqlTypeName() == ANY && !(type instanceof
IgniteCustomType)) {
+ throw unsupportedAggregateFunction(call);
+ }
+
+ return SingleVal.newAccumulator(type);
+ }
+
+ private Supplier<Accumulator> anyValueFactory(AggregateCall call) {
+ RelDataType type = call.getType();
+
+ if (type.getSqlTypeName() == ANY && !(type instanceof
IgniteCustomType)) {
+ throw unsupportedAggregateFunction(call);
+ }
+
+ return AnyVal.newAccumulator(type);
+ }
+
/**
* SingleVal.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -193,7 +213,13 @@ public class Accumulators {
private static class SingleVal extends AnyVal {
private boolean touched;
- public static final Supplier<Accumulator> FACTORY = SingleVal::new;
+ private SingleVal(RelDataType type) {
+ super(type);
+ }
+
+ static Supplier<Accumulator> newAccumulator(RelDataType type) {
+ return () -> new SingleVal(type);
+ }
/** {@inheritDoc} */
@Override
@@ -213,14 +239,12 @@ public class Accumulators {
* Calcite`s implementation RexImpTable#LiteralAggImplementor.
*/
private static class LiteralVal extends AnyVal {
- public static final Supplier<Accumulator> FACTORY = LiteralVal::new;
-
- /** {@inheritDoc} */
- @Override
- public void add(Object... args) {
- assert args.length == 1 : args.length;
+ private LiteralVal(RelDataType type) {
+ super(type);
+ }
- super.add(args);
+ static Supplier<Accumulator> newAccumulator(RelDataType type) {
+ return () -> new LiteralVal(type);
}
/** {@inheritDoc} */
@@ -228,12 +252,6 @@ public class Accumulators {
public Object end() {
return holder != null;
}
-
- /** {@inheritDoc} */
- @Override
- public RelDataType returnType(IgniteTypeFactory typeFactory) {
- return typeFactory.createSqlType(BOOLEAN);
- }
}
/**
@@ -242,8 +260,15 @@ public class Accumulators {
private static class AnyVal implements Accumulator {
protected Object holder;
- public static final Supplier<Accumulator> FACTORY = AnyVal::new;
+ private final RelDataType type;
+ private AnyVal(RelDataType type) {
+ this.type = type;
+ }
+
+ static Supplier<Accumulator> newAccumulator(RelDataType type) {
+ return () -> new AnyVal(type);
+ }
/** {@inheritDoc} */
@Override
@@ -270,7 +295,7 @@ public class Accumulators {
/** {@inheritDoc} */
@Override
public RelDataType returnType(IgniteTypeFactory typeFactory) {
- return typeFactory.createSqlType(ANY);
+ return type;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index b1b58a9719..1765e40cb7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -268,15 +268,24 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
RowHandler<RowT> handler = context().rowHandler();
- for (RowT row : rows) {
- for (int i = 0; i < mapping.length; i++) {
- if (offset == 0 && i == mapping[i]) {
- continue;
- }
+ int[] targetMapping = applyOffset(mapping, offset);
- handler.set(i, row, handler.get(mapping[i] + offset, row));
- }
+ rows.replaceAll(row -> handler.map(row, targetMapping));
+ }
+
+ /** Adds the provided offset to each value in the mapping. */
+ private int[] applyOffset(int[] srcMapping, int offset) {
+ if (offset == 0) {
+ return srcMapping;
}
+
+ int[] targetMapping = new int[srcMapping.length];
+
+ for (int i = 0; i < targetMapping.length; i++) {
+ targetMapping[i] = srcMapping[i] + offset;
+ }
+
+ return targetMapping;
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
index d66d763f6e..da211f2953 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.util.Pair;
+import org.jetbrains.annotations.Nullable;
/**
* Defines methods to provide instances of {@link TypeSpec}s.
@@ -82,4 +83,27 @@ public final class RowSchemaTypes {
return new BaseTypeSpec(nativeType, nullable);
}
}
+
+ /**
+ * Convert specified schema {@link TypeSpec type} to {@link NativeType
native type}.
+ *
+ * @param type Row schema type.
+ * @return Native type or {@code null} if type is a {@link NullTypeSpec}.
+ * @throws IllegalArgumentException If provided type cannot be converted
to a native type.
+ */
+ public static @Nullable NativeType toNativeType(TypeSpec type) {
+ if (type instanceof NullTypeSpec) {
+ return null;
+ }
+
+ if (!(type instanceof BaseTypeSpec)) {
+ throw new IllegalArgumentException("Unexpected type; " + type);
+ }
+
+ NativeType nativeType = ((BaseTypeSpec) type).nativeType();
+
+ assert nativeType != null : type;
+
+ return nativeType;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
index d09f7a5e89..65ac0da499 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
@@ -668,7 +668,7 @@ public class TypeUtils {
// Add collection types support
throw new IllegalArgumentException("Collection types is not
supported: " + type);
} else if (SqlTypeUtil.isArray(type) || type instanceof JavaType) {
- // TODO Remove after is fixed
https://issues.apache.org/jira/browse/IGNITE-19992
+ // TODO Remove after is fixed
https://issues.apache.org/jira/browse/IGNITE-20336
// Move SqlTypeUtil.isArray(type) to collections type support
branch.
return new BaseTypeSpec(null, true);
} else {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index dbd444c0a4..a0fcc755c4 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Node;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.framework.TestTable;
import
org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
index 2b46f736b7..2d8b5d9437 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
@@ -15,23 +15,6 @@
* limitations under the License.
*/
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.ignite.internal.sql.engine.exec;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -45,6 +28,7 @@ import java.util.List;
import org.apache.calcite.rel.RelDistribution.Type;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
import org.apache.ignite.internal.sql.engine.trait.Destination;
import
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.IdentityDistribution;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 43fa095526..0f7b63813e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 5dddf6e991..91146408b3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -36,11 +36,12 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
+import org.apache.ignite.internal.schema.row.InternalTuple;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -361,6 +362,11 @@ public abstract class AbstractExecutionTest extends
IgniteAbstractTest {
public Object[] create(ByteBuffer raw) {
return ByteUtils.fromBytes(raw.array());
}
+
+ @Override
+ public Object[] create(InternalTuple tuple) {
+ throw new UnsupportedOperationException();
+ }
};
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index 29e733486f..bce573e224 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -40,9 +40,9 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 2d23497d71..c9ab8ffe5e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -60,7 +60,6 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -70,6 +69,7 @@ import
org.apache.ignite.internal.sql.engine.exec.TableRowConverter;
import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
import
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
new file mode 100644
index 0000000000..4a7bdc7bfe
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.row;
+
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.generateValueByType;
+import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.columnType2NativeType;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema.Builder;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@link SqlRowHandler}.
+ */
+public class SqlRowHandlerTest extends IgniteAbstractTest {
+ private static final RowHandler<RowWrapper> handler =
SqlRowHandler.INSTANCE;
+
+ private final long seed = ThreadLocalRandom.current().nextLong();
+
+ private final Random rnd = new Random(seed);
+
+ @BeforeEach
+ void printSeed() {
+ log.info("Using seed: " + seed);
+ }
+
+ @Test
+ public void testBytebufferSerialization() {
+ List<ColumnType> columnTypes = columnTypes();
+ Object[] sourceData = values(columnTypes);
+ RowSchema schema = rowSchema(columnTypes, sourceData);
+
+ int elementsCount = schema.fields().size();
+
+ RowFactory<RowWrapper> factory = handler.factory(schema);
+ RowWrapper src = factory.create(sourceData);
+
+ // Serialization to binary tuple representation.
+ ByteBuffer buf = handler.toByteBuffer(src);
+
+ BinaryTuple tuple = new BinaryTuple(elementsCount, buf);
+ RowWrapper destWrap = factory.create(tuple);
+ RowWrapper dest = factory.create(buf);
+
+ for (int i = 0; i < elementsCount; i++) {
+ String msg = schema.fields().get(i).toString();
+
+ assertThat(msg, handler.get(i, src), equalTo(sourceData[i]));
+
+ // Binary tuple wrapper must return data in internal format.
+ Object expected = TypeUtils.toInternal(sourceData[i]);
+
+ assertThat(msg, handler.get(i, dest), equalTo(expected));
+ assertThat(msg, handler.get(i, destWrap), equalTo(expected));
+ }
+ }
+
+ @ParameterizedTest(name = "{0} - {1}")
+ @MethodSource("concatTestArguments")
+ public void testConcat(boolean leftTupleRequired, boolean
rightTupleRequired) {
+ ConcatTestParameters params = new
ConcatTestParameters(leftTupleRequired, rightTupleRequired);
+
+ RowWrapper concatenated = handler.concat(params.left, params.right);
+
+ int leftLen = params.leftData.length;
+ int rightLen = params.rightData.length;
+ int totalElementsCount = leftLen + rightLen;
+
+ assertThat(handler.columnCount(concatenated),
equalTo(totalElementsCount));
+
+ // Build combined schema.
+ Builder builder = RowSchema.builder();
+ params.leftSchema.fields().forEach(builder::addField);
+ params.rightSchema.fields().forEach(builder::addField);
+
+ RowSchema concatenatedSchema = builder.build();
+
+ // Serialize.
+ ByteBuffer buf = handler.toByteBuffer(concatenated);
+
+ // Wrap into row.
+ RowWrapper result = handler.factory(concatenatedSchema).create(new
BinaryTuple(totalElementsCount, buf));
+
+ for (int i = 0; i < leftLen; i++) {
+ assertThat(handler.get(i, result),
equalTo(TypeUtils.toInternal(params.leftData[i])));
+ }
+
+ for (int i = 0; i < rightLen; i++) {
+ assertThat(handler.get(leftLen + i, result),
equalTo(TypeUtils.toInternal(params.rightData[i])));
+ }
+ }
+
+ @Test
+ public void testMap() {
+ List<ColumnType> columnTypes = List.of(
+ ColumnType.INT32,
+ ColumnType.STRING,
+ ColumnType.BOOLEAN,
+ ColumnType.INT32,
+ ColumnType.DOUBLE,
+ ColumnType.STRING
+ );
+
+ int[] mapping = {3, 5};
+
+ Object[] sourceData = values(columnTypes);
+ RowSchema schema = rowSchema(columnTypes, sourceData);
+
+ RowFactory<RowWrapper> factory = handler.factory(schema);
+
+ RowWrapper srcRow = factory.create(sourceData);
+ RowWrapper srcBinRow = factory.create(new
BinaryTuple(handler.columnCount(srcRow), handler.toByteBuffer(srcRow)));
+
+ RowWrapper mappedRow = handler.map(srcRow, mapping);
+ RowWrapper mappedFromBinRow = handler.map(srcBinRow, mapping);
+
+ RowSchema mappedSchema = rowSchema(columnTypes.subList(0,
mapping.length), Arrays.copyOf(sourceData, mapping.length));
+ RowWrapper deserializedMappedBinRow =
handler.factory(mappedSchema).create(handler.toByteBuffer(mappedFromBinRow));
+
+ assertThat(handler.columnCount(mappedRow), equalTo(mapping.length));
+ assertThat(handler.columnCount(mappedFromBinRow),
equalTo(mapping.length));
+
+ for (int i = 0; i < mapping.length; i++) {
+ Object expected = handler.get(mapping[i], srcRow);
+
+ assertThat(handler.get(i, mappedRow), equalTo(expected));
+ assertThat(handler.get(i, mappedFromBinRow), equalTo(expected));
+ assertThat(handler.get(i, deserializedMappedBinRow),
equalTo(expected));
+ }
+ }
+
+ private static Stream<Arguments> concatTestArguments() {
+ return Stream.of(
+ Arguments.of(Named.of("array", false), Named.of("array",
false)),
+ Arguments.of(Named.of("array", false), Named.of("tuple",
true)),
+ Arguments.of(Named.of("tuple", true), Named.of("array",
false)),
+ Arguments.of(Named.of("tuple", true), Named.of("tuple", true))
+ );
+ }
+
+ private RowSchema rowSchema(List<ColumnType> columnTypes, Object[] values)
{
+ Builder schemaBuilder = RowSchema.builder();
+
+ for (int i = 0; i < values.length; i++) {
+ ColumnType type = columnTypes.get(i);
+
+ if (type == ColumnType.NULL) {
+ schemaBuilder.addField(new NullTypeSpec());
+
+ continue;
+ }
+
+ NativeType nativeType = values[i] == null
+ ? columnType2NativeType(type, 9, 3, 20)
+ : NativeTypes.fromObject(values[i]);
+
+ schemaBuilder.addField(nativeType, values[i] == null ||
rnd.nextBoolean());
+ }
+
+ return schemaBuilder.build();
+ }
+
+ private Object[] values(List<ColumnType> columnTypes) {
+ Object[] values = new Object[columnTypes.size()];
+ int baseValue = rnd.nextInt();
+
+ for (int i = 0; i < values.length; i++) {
+ ColumnType type = columnTypes.get(i);
+
+ values[i] = type == ColumnType.NULL ? null :
generateValueByType(baseValue, type);
+ }
+
+ return values;
+ }
+
+ private List<ColumnType> columnTypes() {
+ List<ColumnType> columnTypes = new ArrayList<>(
+ // TODO Include ignored types to test after
https://issues.apache.org/jira/browse/IGNITE-15200
+ EnumSet.complementOf(EnumSet.of(ColumnType.PERIOD,
ColumnType.DURATION))
+ );
+
+ Collections.shuffle(columnTypes, rnd);
+
+ return columnTypes;
+ }
+
+ private final class ConcatTestParameters {
+ final RowSchema leftSchema;
+ final RowSchema rightSchema;
+ final Object[] leftData;
+ final Object[] rightData;
+ final RowWrapper left;
+ final RowWrapper right;
+
+ ConcatTestParameters(boolean leftTupleRequired, boolean
rightTupleRequired) {
+ List<ColumnType> columnTypes1 = columnTypes();
+ List<ColumnType> columnTypes2 = columnTypes();
+
+ leftData = values(columnTypes1);
+ rightData = values(columnTypes2);
+ leftSchema = rowSchema(columnTypes1, leftData);
+ rightSchema = rowSchema(columnTypes2, rightData);
+
+ RowFactory<RowWrapper> factory1 = handler.factory(leftSchema);
+ RowFactory<RowWrapper> factory2 = handler.factory(rightSchema);
+
+ RowWrapper left = factory1.create(leftData);
+ RowWrapper right = factory2.create(rightData);
+
+ this.left = leftTupleRequired ?
factory1.create(handler.toByteBuffer(left)) : left;
+ this.right = rightTupleRequired ?
factory2.create(handler.toByteBuffer(right)) : right;
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
similarity index 82%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
rename to
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
index 13d4ac5bd2..9ff00ae9fa 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.exec;
+package org.apache.ignite.internal.sql.engine.framework;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
@@ -50,6 +52,18 @@ public class ArrayRowHandler implements RowHandler<Object[]>
{
return ArrayUtils.concat(left, right);
}
+ /** {@inheritDoc} */
+ @Override
+ public Object[] map(Object[] row, int[] mapping) {
+ Object[] newRow = new Object[mapping.length];
+
+ for (int i = 0; i < mapping.length; i++) {
+ newRow[i] = row[mapping[i]];
+ }
+
+ return newRow;
+ }
+
/** {@inheritDoc} */
@Override
public int columnCount(Object[] row) {
@@ -99,6 +113,12 @@ public class ArrayRowHandler implements
RowHandler<Object[]> {
public Object[] create(ByteBuffer raw) {
return ByteUtils.fromBytes(raw.array());
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Object[] create(InternalTuple tuple) {
+ throw new UnsupportedOperationException();
+ }
};
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 6131fef933..9e6b32c6fd 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import
org.apache.ignite.internal.sql.engine.exec.TestExecutableTableRegistry.ColocationGroupProvider;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 59288ca9cb..5df6fc04fb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -32,7 +32,6 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.sql.engine.QueryCancel;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolver;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
index f771433701..7c4ae04b6d 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.sql.engine.util;
import java.util.Arrays;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.SimpleHashFunction;
import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.TypesAwareHashFunction;
diff --git
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
index 25d538c949..df7a1a13e2 100644
---
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
+++
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
@@ -32,6 +32,7 @@ import java.time.LocalTime;
import java.time.Period;
import java.time.ZoneId;
import java.time.ZonedDateTime;
+import java.util.BitSet;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -172,7 +173,7 @@ public class SqlTestUtils {
case UUID:
return new UUID(base, base);
case BITMASK:
- return new byte[]{(byte) base};
+ return BitSet.valueOf(BigInteger.valueOf(base).toByteArray());
case DURATION:
return Duration.ofNanos(base);
case DATETIME: