This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6df2cc270a IGNITE-19791 Sql. Introduce SqlRowHandler that supports 
binary tuple rows (#2512)
6df2cc270a is described below

commit 6df2cc270ab8c870b547303b90fb60ee109ac04e
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Sep 7 15:23:40 2023 +0300

    IGNITE-19791 Sql. Introduce SqlRowHandler that supports binary tuple rows 
(#2512)
---
 .../internal/sql/engine/ItSqlOperatorsTest.java    |   7 +-
 .../ignite/internal/schema/row/InternalTuple.java  |  16 +
 .../internal/sql/engine/SqlQueryProcessor.java     |   4 +-
 .../internal/sql/engine/exec/RowHandler.java       |  24 ++
 .../internal/sql/engine/exec/SqlRowHandler.java    | 411 +++++++++++++++++++++
 .../sql/engine/exec/TableRowConverterImpl.java     |  48 ++-
 .../sql/engine/exec/exp/ExpressionFactoryImpl.java |  13 +-
 .../sql/engine/exec/exp/agg/Accumulators.java      |  65 +++-
 .../internal/sql/engine/exec/rel/ModifyNode.java   |  23 +-
 .../sql/engine/exec/row/RowSchemaTypes.java        |  24 ++
 .../ignite/internal/sql/engine/util/TypeUtils.java |   2 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   1 +
 .../exec/IdentityDistributionFunctionSelfTest.java |  18 +-
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   1 +
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   8 +-
 .../engine/exec/rel/MergeJoinExecutionTest.java    |   2 +-
 .../engine/exec/rel/ScannableTableSelfTest.java    |   2 +-
 .../sql/engine/exec/row/SqlRowHandlerTest.java     | 254 +++++++++++++
 .../sql/engine/framework}/ArrayRowHandler.java     |  22 +-
 .../sql/engine/framework/TestBuilders.java         |   1 -
 .../internal/sql/engine/framework/TestNode.java    |   1 -
 .../sql/engine/util/HashFunctionsTest.java         |   2 +-
 .../internal/sql/engine/util/SqlTestUtils.java     |   3 +-
 23 files changed, 874 insertions(+), 78 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
index 3c5f6b0596..b4b03f8c6c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine;
 
-import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.Period;
@@ -193,13 +192,15 @@ public class ItSqlOperatorsTest extends 
ClusterPerClassIntegrationTest {
         assertExpression("COT(1)").returns(1.0d / Math.tan(1)).check();
         assertExpression("DEGREES(1)").returns(Math.toDegrees(1)).check();
         assertExpression("RADIANS(1)").returns(Math.toRadians(1)).check();
-        assertExpression("ROUND(1.7)").returns(BigDecimal.valueOf(2)).check();
+        // TODO https://issues.apache.org/jira/browse/IGNITE-20311
+        // 
assertExpression("ROUND(1.7)").returns(BigDecimal.valueOf(2)).check();
         assertExpression("SIGN(-5)").returns(-1).check();
         assertExpression("SIN(1)").returns(Math.sin(1)).check();
         assertExpression("SINH(1)").returns(Math.sinh(1)).check();
         assertExpression("TAN(1)").returns(Math.tan(1)).check();
         assertExpression("TANH(1)").returns(Math.tanh(1)).check();
-        
assertExpression("TRUNCATE(1.7)").returns(BigDecimal.valueOf(1)).check();
+        // TODO https://issues.apache.org/jira/browse/IGNITE-20311
+        // 
assertExpression("TRUNCATE(1.7)").returns(BigDecimal.valueOf(1)).check();
         assertExpression("PI").returns(Math.PI).check();
     }
 
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
index 642f5ccdd2..c3e242e7bc 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/InternalTuple.java
@@ -46,6 +46,22 @@ public interface InternalTuple {
      */
     boolean hasNullValue(int col);
 
+    /**
+     * Reads value for specified column.
+     *
+     * @param col Column index.
+     * @return Column value.
+     */
+    boolean booleanValue(int col);
+
+    /**
+     * Reads value for specified column.
+     *
+     * @param col Column index.
+     * @return Column value.
+     */
+    Boolean booleanValueBoxed(int col);
+
     /**
      * Reads value for specified column.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index a6b496c6ad..8c7bc44509 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistryImpl;
 import 
org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolverImpl;
@@ -64,6 +63,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.QueryValidationException;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
@@ -282,7 +282,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                 sqlSchemaManager,
                 ddlCommandHandler,
                 taskExecutor,
-                ArrayRowHandler.INSTANCE,
+                SqlRowHandler.INSTANCE,
                 mailboxRegistry,
                 exchangeService,
                 dependencyResolver
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
index 28ccfafe8b..406b0dc7aa 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.row.InternalTuple;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.jetbrains.annotations.Nullable;
 
@@ -44,6 +45,21 @@ public interface RowHandler<RowT> {
     /** Concatenate two rows. */
     RowT concat(RowT left, RowT right);
 
+    /**
+     * Creates a new row containing only the fields specified in the provided 
mapping.
+     *
+     * <p>For example:
+     * <pre>
+     *    source row [5, 6, 7, 8] apply mapping [1, 3]
+     *    result row will be [6, 8]
+     * </pre>
+     *
+     * @param row Source row.
+     * @param mapping Target field indexes.
+     * @return A new row with fields from the specified mapping.
+     */
+    RowT map(RowT row, int[] mapping);
+
     /** Return column count contained in the incoming row. */
     int columnCount(RowT row);
 
@@ -87,5 +103,13 @@ public interface RowHandler<RowT> {
          * @return Instantiation defined representation.
          */
         RowT create(ByteBuffer raw);
+
+        /**
+         * Create row using incoming binary tuple.
+         *
+         * @param tuple {@link InternalTuple} representation.
+         * @return Instantiation defined representation.
+         */
+        RowT create(InternalTuple tuple);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
new file mode 100644
index 0000000000..3ff3d5fcf9
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static 
org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl.UNSPECIFIED_VALUE_PLACEHOLDER;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.DecimalNativeType;
+import org.apache.ignite.internal.schema.InvalidTypeException;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema.Builder;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchemaTypes;
+import org.apache.ignite.internal.sql.engine.exec.row.TypeSpec;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.lang.IgniteStringBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handler that uses a {@link RowWrapper wrapper} to operate with two types of 
row implementations.
+ *
+ * <ul>
+ *     <li>{@link ObjectsArrayRowWrapper Objects array}</li>
+ *     <li>{@link BinaryTupleRowWrapper Binary tuple}</li>
+ * </ul>
+ *
+ * <p>Each kind of rows is serialized to the same binary tuple format
+ * using the {@link #toByteBuffer(RowWrapper) toByteBuffer} method.
+ *
+ * <p>Factory methods {@link RowFactory#create(InternalTuple) 
wrap(InternalTuple)} and
+ * {@link RowFactory#create(ByteBuffer) create(ByteBuffer)} allow create rows 
without
+ * any additional conversions. But the fields in binary tuple must match the
+ * factory {@link RowSchema row schema}.
+ */
+public class SqlRowHandler implements RowHandler<RowWrapper> {
+    public static final RowHandler<RowWrapper> INSTANCE = new SqlRowHandler();
+
+    private SqlRowHandler() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Object get(int field, RowWrapper row) {
+        return row.get(field);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void set(int field, RowWrapper row, @Nullable Object val) {
+        row.set(field, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RowWrapper concat(RowWrapper left, RowWrapper right) {
+        int leftLen = left.columnsCount();
+        int rightLen = right.columnsCount();
+        List<TypeSpec> leftTypes = left.rowSchema().fields();
+        List<TypeSpec> rightTypes = right.rowSchema().fields();
+
+        Object[] values = new Object[leftLen + rightLen];
+        Builder schemaBuilder = RowSchema.builder();
+
+        for (int i = 0; i < leftLen; i++) {
+            values[i] = left.get(i);
+            schemaBuilder.addField(leftTypes.get(i));
+        }
+
+        for (int i = 0; i < rightLen; i++) {
+            values[leftLen + i] = right.get(i);
+            schemaBuilder.addField(rightTypes.get(i));
+        }
+
+        return new ObjectsArrayRowWrapper(schemaBuilder.build(), values);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RowWrapper map(RowWrapper row, int[] mapping) {
+        Object[] fields = new Object[mapping.length];
+
+        for (int i = 0; i < mapping.length; i++) {
+            fields[i] = row.get(mapping[i]);
+        }
+
+        return new ObjectsArrayRowWrapper(row.rowSchema(), fields);
+    }
+
+    @Override
+    public int columnCount(RowWrapper row) {
+        return row.columnsCount();
+    }
+
+    @Override
+    public String toString(RowWrapper row) {
+        IgniteStringBuilder buf = new IgniteStringBuilder("Row[");
+        int maxIdx = columnCount(row) - 1;
+
+        for (int i = 0; i <= maxIdx; i++) {
+            buf.app(row.get(i));
+
+            if (i != maxIdx) {
+                buf.app(", ");
+            }
+        }
+
+        return buf.app(']').toString();
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer(RowWrapper row) {
+        return row.toByteBuffer();
+    }
+
+    @Override
+    public RowFactory<RowWrapper> factory(RowSchema rowSchema) {
+        int schemaLen = rowSchema.fields().size();
+
+        return new RowFactory<>() {
+            /** {@inheritDoc} */
+            @Override
+            public RowHandler<RowWrapper> handler() {
+                return SqlRowHandler.this;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public RowWrapper create() {
+                return create(new Object[schemaLen]);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public RowWrapper create(Object... fields) {
+                assert fields.length == schemaLen;
+
+                return new ObjectsArrayRowWrapper(rowSchema, fields);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public RowWrapper create(ByteBuffer buf) {
+                return create(new BinaryTuple(schemaLen, buf));
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public RowWrapper create(InternalTuple tuple) {
+                assert schemaLen == tuple.elementCount();
+
+                return new BinaryTupleRowWrapper(rowSchema, tuple);
+            }
+        };
+    }
+
+    /**
+     * Provides the ability for a single {@link RowHandler} instance to 
interact with multiple row implementations.
+     */
+    public abstract static class RowWrapper {
+        abstract int columnsCount();
+
+        abstract RowSchema rowSchema();
+
+        abstract @Nullable Object get(int field);
+
+        abstract void set(int field, Object value);
+
+        abstract ByteBuffer toByteBuffer();
+    }
+
+    /**
+     * Wrapper over an array of objects.
+     */
+    private static class ObjectsArrayRowWrapper extends RowWrapper {
+        private final RowSchema rowSchema;
+        private final Object[] row;
+
+        ObjectsArrayRowWrapper(RowSchema rowSchema, Object[] row) {
+            this.rowSchema = rowSchema;
+            this.row = row;
+        }
+
+        @Override
+        int columnsCount() {
+            return row.length;
+        }
+
+        @Override
+        @Nullable Object get(int field) {
+            return row[field];
+        }
+
+        @Override
+        void set(int field, @Nullable Object value) {
+            row[field] = value;
+        }
+
+        @Override
+        ByteBuffer toByteBuffer() {
+            BinaryTupleBuilder tupleBuilder = new 
BinaryTupleBuilder(row.length);
+
+            for (int i = 0; i < row.length; i++) {
+                Object value = row[i];
+
+                assert value != UNSPECIFIED_VALUE_PLACEHOLDER : "Invalid row 
value.";
+
+                appendValue(tupleBuilder, rowSchema.fields().get(i), value);
+            }
+
+            return tupleBuilder.build();
+        }
+
+        @Override
+        RowSchema rowSchema() {
+            return rowSchema;
+        }
+
+        private void appendValue(BinaryTupleBuilder builder, TypeSpec 
schemaType, @Nullable Object value) {
+            if (value == null) {
+                builder.appendNull();
+
+                return;
+            }
+
+            NativeType nativeType = RowSchemaTypes.toNativeType(schemaType);
+
+            value = TypeUtils.fromInternal(value, 
NativeTypeSpec.toClass(nativeType.spec(), schemaType.isNullable()));
+
+            assert value != null : nativeType;
+
+            switch (nativeType.spec()) {
+                case BOOLEAN:
+                    builder.appendBoolean((boolean) value);
+                    break;
+
+                case INT8:
+                    builder.appendByte((byte) value);
+                    break;
+
+                case INT16:
+                    builder.appendShort((short) value);
+                    break;
+
+                case INT32:
+                    builder.appendInt((int) value);
+                    break;
+
+                case INT64:
+                    builder.appendLong((long) value);
+                    break;
+
+                case FLOAT:
+                    builder.appendFloat((float) value);
+                    break;
+
+                case DOUBLE:
+                    builder.appendDouble((double) value);
+                    break;
+
+                case NUMBER:
+                    builder.appendNumberNotNull((BigInteger) value);
+                    break;
+
+                case DECIMAL:
+                    builder.appendDecimalNotNull((BigDecimal) value, 
((DecimalNativeType) nativeType).scale());
+                    break;
+
+                case UUID:
+                    builder.appendUuidNotNull((UUID) value);
+                    break;
+
+                case BYTES:
+                    builder.appendBytesNotNull((byte[]) value);
+                    break;
+
+                case STRING:
+                    builder.appendStringNotNull((String) value);
+                    break;
+
+                case BITMASK:
+                    builder.appendBitmaskNotNull((BitSet) value);
+                    break;
+
+                case DATE:
+                    builder.appendDateNotNull((LocalDate) value);
+                    break;
+
+                case TIME:
+                    builder.appendTimeNotNull((LocalTime) value);
+                    break;
+
+                case DATETIME:
+                    builder.appendDateTimeNotNull((LocalDateTime) value);
+                    break;
+
+                case TIMESTAMP:
+                    builder.appendTimestampNotNull((Instant) value);
+                    break;
+
+                default:
+                    throw new UnsupportedOperationException("Unknown type " + 
nativeType);
+            }
+        }
+    }
+
+    /**
+     * Wrapper over an {@link BinaryTuple}.
+     *
+     * <p>Since {@link BinaryTuple binary tuple} is immutable this wrapper 
doesn't support {@link #set(int, Object)} operation.
+     */
+    private static class BinaryTupleRowWrapper extends RowWrapper {
+        private final RowSchema rowSchema;
+        private final InternalTuple tuple;
+
+        BinaryTupleRowWrapper(RowSchema rowSchema, InternalTuple tuple) {
+            this.rowSchema = rowSchema;
+            this.tuple = tuple;
+        }
+
+        @Override
+        int columnsCount() {
+            return tuple.elementCount();
+        }
+
+        @Override
+        @Nullable Object get(int field) {
+            NativeType nativeType = 
RowSchemaTypes.toNativeType(rowSchema.fields().get(field));
+
+            if (nativeType == null) {
+                return null;
+            }
+
+            Object value = readValue(tuple, nativeType, field);
+
+            if (value == null) {
+                return null;
+            }
+
+            return TypeUtils.toInternal(value, 
Commons.nativeTypeToClass(nativeType));
+        }
+
+        @Override
+        void set(int field, @Nullable Object value) {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-20356
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        ByteBuffer toByteBuffer() {
+            return tuple.byteBuffer();
+        }
+
+        @Override
+        RowSchema rowSchema() {
+            return rowSchema;
+        }
+
+        private @Nullable Object readValue(InternalTuple tuple, NativeType 
nativeType, int fieldIndex) {
+            switch (nativeType.spec()) {
+                case BOOLEAN: return tuple.booleanValueBoxed(fieldIndex);
+                case INT8: return tuple.byteValueBoxed(fieldIndex);
+                case INT16: return tuple.shortValueBoxed(fieldIndex);
+                case INT32: return tuple.intValueBoxed(fieldIndex);
+                case INT64: return tuple.longValueBoxed(fieldIndex);
+                case FLOAT: return tuple.floatValueBoxed(fieldIndex);
+                case DOUBLE: return tuple.doubleValueBoxed(fieldIndex);
+                case DECIMAL: return tuple.decimalValue(fieldIndex, 
((DecimalNativeType) nativeType).scale());
+                case UUID: return tuple.uuidValue(fieldIndex);
+                case STRING: return tuple.stringValue(fieldIndex);
+                case BYTES: return tuple.bytesValue(fieldIndex);
+                case BITMASK: return tuple.bitmaskValue(fieldIndex);
+                case NUMBER: return tuple.numberValue(fieldIndex);
+                case DATE: return tuple.dateValue(fieldIndex);
+                case TIME: return tuple.timeValue(fieldIndex);
+                case DATETIME: return tuple.dateTimeValue(fieldIndex);
+                case TIMESTAMP: return tuple.timestampValue(fieldIndex);
+                default: throw new InvalidTypeException("Unknown element type: 
" + nativeType);
+            }
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
index dc81c12c76..5c58079614 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
@@ -18,13 +18,15 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import java.util.BitSet;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -38,11 +40,15 @@ public class TableRowConverterImpl implements 
TableRowConverter {
 
     private final TableDescriptor desc;
 
+    private final BinaryTupleSchema binaryTupleSchema;
+
     /** Constructor. */
     public TableRowConverterImpl(SchemaRegistry schemaRegistry, 
SchemaDescriptor schemaDescriptor, TableDescriptor desc) {
         this.schemaRegistry = schemaRegistry;
         this.schemaDescriptor = schemaDescriptor;
         this.desc = desc;
+
+        this.binaryTupleSchema = 
BinaryTupleSchema.createRowSchema(schemaDescriptor);
     }
 
     /** {@inheritDoc} */
@@ -53,30 +59,36 @@ public class TableRowConverterImpl implements 
TableRowConverter {
             RowHandler.RowFactory<RowT> factory,
             @Nullable BitSet requiredColumns
     ) {
-        RowHandler<RowT> handler = factory.handler();
+        Row row = schemaRegistry.resolve(binaryRow, schemaDescriptor);
 
-        assert handler == ectx.rowHandler();
+        BinaryTuple tuple = requiredColumns == null
+                ? allColumnsTuple(row, binaryTupleSchema)
+                : requiredColumnsTuple(row, binaryTupleSchema, 
requiredColumns);
 
-        RowT res = factory.create();
+        return factory.create(tuple);
+    }
 
-        assert handler.columnCount(res) == (requiredColumns == null ? 
desc.columnsCount() : requiredColumns.cardinality());
+    private BinaryTuple allColumnsTuple(Row row, BinaryTupleSchema 
binarySchema) {
+        BinaryTupleBuilder tupleBuilder = new 
BinaryTupleBuilder(desc.columnsCount());
 
-        Row row = schemaRegistry.resolve(binaryRow, schemaDescriptor);
+        for (int i = 0; i < desc.columnsCount(); i++) {
+            int index = desc.columnDescriptor(i).physicalIndex();
+
+            BinaryRowConverter.appendValue(tupleBuilder, 
binarySchema.element(index), binarySchema.value(row, index));
+        }
+
+        return new BinaryTuple(tupleBuilder.numElements(), 
tupleBuilder.build());
+    }
 
-        if (requiredColumns == null) {
-            for (int i = 0; i < desc.columnsCount(); i++) {
-                ColumnDescriptor colDesc = desc.columnDescriptor(i);
+    private BinaryTuple requiredColumnsTuple(Row row, BinaryTupleSchema 
binarySchema, BitSet requiredColumns) {
+        BinaryTupleBuilder tupleBuilder = new 
BinaryTupleBuilder(requiredColumns.cardinality());
 
-                handler.set(i, res, 
TypeUtils.toInternal(row.value(colDesc.physicalIndex())));
-            }
-        } else {
-            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = 
requiredColumns.nextSetBit(j + 1), i++) {
-                ColumnDescriptor colDesc = desc.columnDescriptor(j);
+        for (int i = requiredColumns.nextSetBit(0); i != -1; i = 
requiredColumns.nextSetBit(i + 1)) {
+            int index = desc.columnDescriptor(i).physicalIndex();
 
-                handler.set(i, res, 
TypeUtils.toInternal(row.value(colDesc.physicalIndex())));
-            }
+            BinaryRowConverter.appendValue(tupleBuilder, 
binarySchema.element(index), binarySchema.value(row, index));
         }
 
-        return res;
+        return new BinaryTuple(tupleBuilder.numElements(), 
tupleBuilder.build());
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
index ed38d522c3..700e102f68 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
@@ -277,8 +277,17 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
     /** {@inheritDoc} */
     @Override
     public <T> Supplier<T> execute(RexNode node) {
-        RelDataType nodeType = node.getType();
-        RowSchema rowSchema = 
TypeUtils.rowSchemaFromRelTypes(List.of(nodeType));
+        RelDataType exprType = node.getType();
+        List<RelDataType> typesList;
+
+        if (exprType.getSqlTypeName() == SqlTypeName.ROW) {
+            // Convert a row returned from a table function into a list of 
columns.
+            typesList = RelOptUtil.getFieldTypeList(exprType);
+        } else {
+            typesList = List.of(exprType);
+        }
+
+        RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(typesList);
 
         RowFactory<RowT> factory = ctx.rowHandler().factory(rowSchema);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
index 458cf56dfc..2ff04c5950 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
@@ -88,11 +88,11 @@ public class Accumulators {
             case "SOME":
                 return minMaxFactory(false, call);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
-            case "LITERAL_AGG":
-                return LiteralVal.FACTORY;
+                return singleValueFactory(call);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return anyValueFactory(call);
+            case "LITERAL_AGG":
+                return 
LiteralVal.newAccumulator(typeFactory.createSqlType(BOOLEAN));
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
@@ -186,6 +186,26 @@ public class Accumulators {
         }
     }
 
+    private Supplier<Accumulator> singleValueFactory(AggregateCall call) {
+        RelDataType type = call.getType();
+
+        if (type.getSqlTypeName() == ANY && !(type instanceof 
IgniteCustomType)) {
+            throw unsupportedAggregateFunction(call);
+        }
+
+        return SingleVal.newAccumulator(type);
+    }
+
+    private Supplier<Accumulator> anyValueFactory(AggregateCall call) {
+        RelDataType type = call.getType();
+
+        if (type.getSqlTypeName() == ANY && !(type instanceof 
IgniteCustomType)) {
+            throw unsupportedAggregateFunction(call);
+        }
+
+        return AnyVal.newAccumulator(type);
+    }
+
     /**
      * SingleVal.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -193,7 +213,13 @@ public class Accumulators {
     private static class SingleVal extends AnyVal {
         private boolean touched;
 
-        public static final Supplier<Accumulator> FACTORY = SingleVal::new;
+        private SingleVal(RelDataType type) {
+            super(type);
+        }
+
+        static Supplier<Accumulator> newAccumulator(RelDataType type) {
+            return () -> new SingleVal(type);
+        }
 
         /** {@inheritDoc} */
         @Override
@@ -213,14 +239,12 @@ public class Accumulators {
      * Calcite`s implementation RexImpTable#LiteralAggImplementor.
      */
     private static class LiteralVal extends AnyVal {
-        public static final Supplier<Accumulator> FACTORY = LiteralVal::new;
-
-        /** {@inheritDoc} */
-        @Override
-        public void add(Object... args) {
-            assert args.length == 1 : args.length;
+        private LiteralVal(RelDataType type) {
+            super(type);
+        }
 
-            super.add(args);
+        static Supplier<Accumulator> newAccumulator(RelDataType type) {
+            return () -> new LiteralVal(type);
         }
 
         /** {@inheritDoc} */
@@ -228,12 +252,6 @@ public class Accumulators {
         public Object end() {
             return holder != null;
         }
-
-        /** {@inheritDoc} */
-        @Override
-        public RelDataType returnType(IgniteTypeFactory typeFactory) {
-            return typeFactory.createSqlType(BOOLEAN);
-        }
     }
 
     /**
@@ -242,8 +260,15 @@ public class Accumulators {
     private static class AnyVal implements Accumulator {
         protected Object holder;
 
-        public static final Supplier<Accumulator> FACTORY = AnyVal::new;
+        private final RelDataType type;
 
+        private AnyVal(RelDataType type) {
+            this.type = type;
+        }
+
+        static Supplier<Accumulator> newAccumulator(RelDataType type) {
+            return () -> new AnyVal(type);
+        }
 
         /** {@inheritDoc} */
         @Override
@@ -270,7 +295,7 @@ public class Accumulators {
         /** {@inheritDoc} */
         @Override
         public RelDataType returnType(IgniteTypeFactory typeFactory) {
-            return typeFactory.createSqlType(ANY);
+            return type;
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index b1b58a9719..1765e40cb7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -268,15 +268,24 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
 
         RowHandler<RowT> handler = context().rowHandler();
 
-        for (RowT row : rows) {
-            for (int i = 0; i < mapping.length; i++) {
-                if (offset == 0 && i == mapping[i]) {
-                    continue;
-                }
+        int[] targetMapping = applyOffset(mapping, offset);
 
-                handler.set(i, row, handler.get(mapping[i] + offset, row));
-            }
+        rows.replaceAll(row -> handler.map(row, targetMapping));
+    }
+
+    /** Adds the provided offset to each value in the mapping. */
+    private int[] applyOffset(int[] srcMapping, int offset) {
+        if (offset == 0) {
+            return srcMapping;
         }
+
+        int[] targetMapping = new int[srcMapping.length];
+
+        for (int i = 0; i < targetMapping.length; i++) {
+            targetMapping[i] = srcMapping[i] + offset;
+        }
+
+        return targetMapping;
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
index d66d763f6e..da211f2953 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/row/RowSchemaTypes.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.util.Pair;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Defines methods to provide instances of {@link TypeSpec}s.
@@ -82,4 +83,27 @@ public final class RowSchemaTypes {
             return new BaseTypeSpec(nativeType, nullable);
         }
     }
+
+    /**
+     * Convert specified schema {@link TypeSpec type} to {@link NativeType 
native type}.
+     *
+     * @param type Row schema type.
+     * @return Native type or {@code null} if type is a {@link NullTypeSpec}.
+     * @throws IllegalArgumentException If provided type cannot be converted 
to a native type.
+     */
+    public static @Nullable NativeType toNativeType(TypeSpec type) {
+        if (type instanceof NullTypeSpec) {
+            return null;
+        }
+
+        if (!(type instanceof BaseTypeSpec)) {
+            throw new IllegalArgumentException("Unexpected type; " + type);
+        }
+
+        NativeType nativeType = ((BaseTypeSpec) type).nativeType();
+
+        assert nativeType != null : type;
+
+        return nativeType;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
index d09f7a5e89..65ac0da499 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/TypeUtils.java
@@ -668,7 +668,7 @@ public class TypeUtils {
             //  Add collection types support
             throw new IllegalArgumentException("Collection types is not 
supported: " + type);
         } else if (SqlTypeUtil.isArray(type) || type instanceof JavaType) {
-            // TODO Remove after is fixed 
https://issues.apache.org/jira/browse/IGNITE-19992
+            // TODO Remove after is fixed 
https://issues.apache.org/jira/browse/IGNITE-20336
             //  Move SqlTypeUtil.isArray(type) to collections type support 
branch.
             return new BaseTypeSpec(null, true);
         } else {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index dbd444c0a4..a0fcc755c4 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.Node;
 import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.ScanNode;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import org.apache.ignite.internal.sql.engine.framework.TestTable;
 import 
org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
index 2b46f736b7..2d8b5d9437 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/IdentityDistributionFunctionSelfTest.java
@@ -15,23 +15,6 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.ignite.internal.sql.engine.exec;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -45,6 +28,7 @@ import java.util.List;
 import org.apache.calcite.rel.RelDistribution.Type;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import 
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.IdentityDistribution;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 43fa095526..0f7b63813e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 5dddf6e991..91146408b3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -36,11 +36,12 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
+import org.apache.ignite.internal.schema.row.InternalTuple;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -361,6 +362,11 @@ public abstract class AbstractExecutionTest extends 
IgniteAbstractTest {
             public Object[] create(ByteBuffer raw) {
                 return ByteUtils.fromBytes(raw.array());
             }
+
+            @Override
+            public Object[] create(InternalTuple tuple) {
+                throw new UnsupportedOperationException();
+            }
         };
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index 29e733486f..bce573e224 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -40,9 +40,9 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 2d23497d71..c9ab8ffe5e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -60,7 +60,6 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -70,6 +69,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.TableRowConverter;
 import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
 import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
new file mode 100644
index 0000000000..4a7bdc7bfe
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.row;
+
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.generateValueByType;
+import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.columnType2NativeType;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema.Builder;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@link SqlRowHandler}.
+ */
+public class SqlRowHandlerTest extends IgniteAbstractTest {
+    private static final RowHandler<RowWrapper> handler = 
SqlRowHandler.INSTANCE;
+
+    private final long seed = ThreadLocalRandom.current().nextLong();
+
+    private final Random rnd = new Random(seed);
+
+    @BeforeEach
+    void printSeed() {
+        log.info("Using seed: " + seed);
+    }
+
+    @Test
+    public void testBytebufferSerialization() {
+        List<ColumnType> columnTypes = columnTypes();
+        Object[] sourceData = values(columnTypes);
+        RowSchema schema = rowSchema(columnTypes, sourceData);
+
+        int elementsCount = schema.fields().size();
+
+        RowFactory<RowWrapper> factory = handler.factory(schema);
+        RowWrapper src = factory.create(sourceData);
+
+        // Serialization to binary tuple representation.
+        ByteBuffer buf = handler.toByteBuffer(src);
+
+        BinaryTuple tuple = new BinaryTuple(elementsCount, buf);
+        RowWrapper destWrap = factory.create(tuple);
+        RowWrapper dest = factory.create(buf);
+
+        for (int i = 0; i < elementsCount; i++) {
+            String msg = schema.fields().get(i).toString();
+
+            assertThat(msg, handler.get(i, src), equalTo(sourceData[i]));
+
+            // Binary tuple wrapper must return data in internal format.
+            Object expected = TypeUtils.toInternal(sourceData[i]);
+
+            assertThat(msg, handler.get(i, dest), equalTo(expected));
+            assertThat(msg, handler.get(i, destWrap), equalTo(expected));
+        }
+    }
+
+    @ParameterizedTest(name = "{0} - {1}")
+    @MethodSource("concatTestArguments")
+    public void testConcat(boolean leftTupleRequired, boolean 
rightTupleRequired) {
+        ConcatTestParameters params = new 
ConcatTestParameters(leftTupleRequired, rightTupleRequired);
+
+        RowWrapper concatenated = handler.concat(params.left, params.right);
+
+        int leftLen = params.leftData.length;
+        int rightLen = params.rightData.length;
+        int totalElementsCount = leftLen + rightLen;
+
+        assertThat(handler.columnCount(concatenated), 
equalTo(totalElementsCount));
+
+        // Build combined schema.
+        Builder builder = RowSchema.builder();
+        params.leftSchema.fields().forEach(builder::addField);
+        params.rightSchema.fields().forEach(builder::addField);
+
+        RowSchema concatenatedSchema = builder.build();
+
+        // Serialize.
+        ByteBuffer buf = handler.toByteBuffer(concatenated);
+
+        // Wrap into row.
+        RowWrapper result = handler.factory(concatenatedSchema).create(new 
BinaryTuple(totalElementsCount, buf));
+
+        for (int i = 0; i < leftLen; i++) {
+            assertThat(handler.get(i, result), 
equalTo(TypeUtils.toInternal(params.leftData[i])));
+        }
+
+        for (int i = 0; i < rightLen; i++) {
+            assertThat(handler.get(leftLen + i, result), 
equalTo(TypeUtils.toInternal(params.rightData[i])));
+        }
+    }
+
+    @Test
+    public void testMap() {
+        List<ColumnType> columnTypes = List.of(
+                ColumnType.INT32,
+                ColumnType.STRING,
+                ColumnType.BOOLEAN,
+                ColumnType.INT32,
+                ColumnType.DOUBLE,
+                ColumnType.STRING
+        );
+
+        int[] mapping = {3, 5};
+
+        Object[] sourceData = values(columnTypes);
+        RowSchema schema = rowSchema(columnTypes, sourceData);
+
+        RowFactory<RowWrapper> factory = handler.factory(schema);
+
+        RowWrapper srcRow = factory.create(sourceData);
+        RowWrapper srcBinRow = factory.create(new 
BinaryTuple(handler.columnCount(srcRow), handler.toByteBuffer(srcRow)));
+
+        RowWrapper mappedRow = handler.map(srcRow, mapping);
+        RowWrapper mappedFromBinRow = handler.map(srcBinRow, mapping);
+
+        RowSchema mappedSchema = rowSchema(columnTypes.subList(0, 
mapping.length), Arrays.copyOf(sourceData, mapping.length));
+        RowWrapper deserializedMappedBinRow = 
handler.factory(mappedSchema).create(handler.toByteBuffer(mappedFromBinRow));
+
+        assertThat(handler.columnCount(mappedRow), equalTo(mapping.length));
+        assertThat(handler.columnCount(mappedFromBinRow), 
equalTo(mapping.length));
+
+        for (int i = 0; i < mapping.length; i++) {
+            Object expected = handler.get(mapping[i], srcRow);
+
+            assertThat(handler.get(i, mappedRow), equalTo(expected));
+            assertThat(handler.get(i, mappedFromBinRow), equalTo(expected));
+            assertThat(handler.get(i, deserializedMappedBinRow), 
equalTo(expected));
+        }
+    }
+
+    private static Stream<Arguments> concatTestArguments() {
+        return Stream.of(
+                Arguments.of(Named.of("array", false), Named.of("array", 
false)),
+                Arguments.of(Named.of("array", false), Named.of("tuple", 
true)),
+                Arguments.of(Named.of("tuple", true), Named.of("array", 
false)),
+                Arguments.of(Named.of("tuple", true), Named.of("tuple", true))
+        );
+    }
+
+    private RowSchema rowSchema(List<ColumnType> columnTypes, Object[] values) 
{
+        Builder schemaBuilder = RowSchema.builder();
+
+        for (int i = 0; i < values.length; i++) {
+            ColumnType type = columnTypes.get(i);
+
+            if (type == ColumnType.NULL) {
+                schemaBuilder.addField(new NullTypeSpec());
+
+                continue;
+            }
+
+            NativeType nativeType = values[i] == null
+                    ? columnType2NativeType(type, 9, 3, 20)
+                    : NativeTypes.fromObject(values[i]);
+
+            schemaBuilder.addField(nativeType, values[i] == null || 
rnd.nextBoolean());
+        }
+
+        return schemaBuilder.build();
+    }
+
+    private Object[] values(List<ColumnType> columnTypes) {
+        Object[] values = new Object[columnTypes.size()];
+        int baseValue = rnd.nextInt();
+
+        for (int i = 0; i < values.length; i++) {
+            ColumnType type = columnTypes.get(i);
+
+            values[i] = type == ColumnType.NULL ? null : 
generateValueByType(baseValue, type);
+        }
+
+        return values;
+    }
+
+    private List<ColumnType> columnTypes() {
+        List<ColumnType> columnTypes = new ArrayList<>(
+                // TODO Include ignored types to test after 
https://issues.apache.org/jira/browse/IGNITE-15200
+                EnumSet.complementOf(EnumSet.of(ColumnType.PERIOD, 
ColumnType.DURATION))
+        );
+
+        Collections.shuffle(columnTypes, rnd);
+
+        return columnTypes;
+    }
+
+    private final class ConcatTestParameters {
+        final RowSchema leftSchema;
+        final RowSchema rightSchema;
+        final Object[] leftData;
+        final Object[] rightData;
+        final RowWrapper left;
+        final RowWrapper right;
+
+        ConcatTestParameters(boolean leftTupleRequired, boolean 
rightTupleRequired) {
+            List<ColumnType> columnTypes1 = columnTypes();
+            List<ColumnType> columnTypes2 = columnTypes();
+
+            leftData = values(columnTypes1);
+            rightData = values(columnTypes2);
+            leftSchema = rowSchema(columnTypes1, leftData);
+            rightSchema = rowSchema(columnTypes2, rightData);
+
+            RowFactory<RowWrapper> factory1 = handler.factory(leftSchema);
+            RowFactory<RowWrapper> factory2 = handler.factory(rightSchema);
+
+            RowWrapper left = factory1.create(leftData);
+            RowWrapper right = factory2.create(rightData);
+
+            this.left = leftTupleRequired ? 
factory1.create(handler.toByteBuffer(left)) : left;
+            this.right = rightTupleRequired ? 
factory2.create(handler.toByteBuffer(right)) : right;
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
similarity index 82%
rename from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
rename to 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
index 13d4ac5bd2..9ff00ae9fa 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec;
+package org.apache.ignite.internal.sql.engine.framework;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -50,6 +52,18 @@ public class ArrayRowHandler implements RowHandler<Object[]> 
{
         return ArrayUtils.concat(left, right);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Object[] map(Object[] row, int[] mapping) {
+        Object[] newRow = new Object[mapping.length];
+
+        for (int i = 0; i < mapping.length; i++) {
+            newRow[i] = row[mapping[i]];
+        }
+
+        return newRow;
+    }
+
     /** {@inheritDoc} */
     @Override
     public int columnCount(Object[] row) {
@@ -99,6 +113,12 @@ public class ArrayRowHandler implements 
RowHandler<Object[]> {
             public Object[] create(ByteBuffer raw) {
                 return ByteUtils.fromBytes(raw.array());
             }
+
+            /** {@inheritDoc} */
+            @Override
+            public Object[] create(InternalTuple tuple) {
+                throw new UnsupportedOperationException();
+            }
         };
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 6131fef933..9e6b32c6fd 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import 
org.apache.ignite.internal.sql.engine.exec.TestExecutableTableRegistry.ColocationGroupProvider;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 59288ca9cb..5df6fc04fb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -32,7 +32,6 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolver;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
index f771433701..7c4ae04b6d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/HashFunctionsTest.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.sql.engine.util;
 import java.util.Arrays;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
 import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.SimpleHashFunction;
 import 
org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl.TypesAwareHashFunction;
diff --git 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
index 25d538c949..df7a1a13e2 100644
--- 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
+++ 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
@@ -32,6 +32,7 @@ import java.time.LocalTime;
 import java.time.Period;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
+import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -172,7 +173,7 @@ public class SqlTestUtils {
             case UUID:
                 return new UUID(base, base);
             case BITMASK:
-                return new byte[]{(byte) base};
+                return BitSet.valueOf(BigInteger.valueOf(base).toByteArray());
             case DURATION:
                 return Duration.ofNanos(base);
             case DATETIME:

Reply via email to