This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 131c9b4459 IGNITE-20618 Sql. Degradation of SELECT operations 
performance over time (#2728)
131c9b4459 is described below

commit 131c9b4459639e4b7a6ec5985e142fe25b4fccf2
Author: korlov42 <[email protected]>
AuthorDate: Wed Oct 25 10:38:11 2023 +0300

    IGNITE-20618 Sql. Degradation of SELECT operations performance over time 
(#2728)
---
 .../internal/binarytuple/BinaryTupleBuilder.java   |  17 ++-
 .../ignite/internal/schema/BinaryTuplePrefix.java  | 102 +++++++++++++
 .../internal/schema/BinaryTuplePrefixTest.java     |  77 ++++++++++
 .../internal/sql/engine/exec/ExchangeService.java  |   4 +-
 .../sql/engine/exec/ExchangeServiceImpl.java       |   4 +-
 .../engine/exec/ExecutableTableRegistryImpl.java   |   2 +-
 .../internal/sql/engine/exec/RowConverter.java     | 124 ----------------
 .../internal/sql/engine/exec/RowHandler.java       |  16 +-
 .../sql/engine/exec/ScannableTableImpl.java        |  63 ++++----
 .../internal/sql/engine/exec/SqlRowHandler.java    |  80 +++++++---
 .../ignite/internal/sql/engine/exec/rel/Inbox.java |   8 +-
 .../internal/sql/engine/exec/rel/Outbox.java       |  16 +-
 .../sql/engine/message/QueryBatchMessage.java      |   4 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   7 -
 .../sql/engine/exec/rel/ExchangeExecutionTest.java |  17 ++-
 .../sql/engine/exec/rel/ExecutionTest.java         |  28 ----
 .../engine/exec/rel/ScannableTableSelfTest.java    |  12 +-
 .../exec/rel/TableScanNodeExecutionTest.java       |   6 +-
 .../sql/engine/exec/row/SqlRowHandlerTest.java     |  44 +++---
 .../sql/engine/framework/ArrayRowHandler.java      | 165 +++++++++++++++++++--
 20 files changed, 504 insertions(+), 292 deletions(-)

diff --git 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
index c651536d6b..ae681d0d6a 100644
--- 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
+++ 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
@@ -82,11 +82,24 @@ public class BinaryTupleBuilder {
      * @param totalValueSize Total estimated length of non-NULL values, -1 if 
not known.
      */
     public BinaryTupleBuilder(int numElements, int totalValueSize) {
+        this(numElements, totalValueSize, true);
+    }
+
+    /**
+     * Creates a builder.
+     *
+     * @param numElements Number of tuple elements.
+     * @param totalValueSize Total estimated length of non-NULL values, -1 if 
not known.
+     * @param exactEstimate Whether the total size is exact estimate or 
approximate. The
+     *      difference here is with exact estimate allocation will be optimal, 
while with
+     *      approximate estimate some excess allocation is possible.
+     */
+    public BinaryTupleBuilder(int numElements, int totalValueSize, boolean 
exactEstimate) {
         this.numElements = numElements;
 
         entryBase = BinaryTupleCommon.HEADER_SIZE;
 
-        if (totalValueSize < 0) {
+        if (totalValueSize < 0 || !exactEstimate) {
             entrySize = Integer.BYTES;
         } else {
             entrySize = 
BinaryTupleCommon.flagsToEntrySize(BinaryTupleCommon.valueSizeToFlags(totalValueSize));
@@ -359,7 +372,7 @@ public class BinaryTupleBuilder {
      * @param value Element value.
      * @return {@code this} for chaining.
      */
-    public BinaryTupleBuilder appendBytes(byte[] value) {
+    public BinaryTupleBuilder appendBytes(byte @Nullable [] value) {
         return value == null ? appendNull() : appendBytesNotNull(value);
     }
 
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
index cecbb140d4..041d46a135 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTuplePrefix.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.schema;
 import static 
org.apache.ignite.internal.binarytuple.BinaryTupleCommon.PREFIX_FLAG;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.lang.InternalTuple;
@@ -51,6 +52,25 @@ public class BinaryTuplePrefix extends BinaryTupleReader 
implements InternalTupl
         super(elementCount, buffer);
     }
 
+    /**
+     * Creates a prefix from provided {@link BinaryTuple}. If given tuple has 
lesser or equal number of
+     * columns, then all elements will be used in resulting prefix. If given 
tuple has more columns, then
+     * excess columns will be truncated.
+     *
+     * @param numElements Number of elements in full schema of prefix.
+     * @param tuple Tuple to create a prefix from.
+     * @return Prefix, created from provided tuple with regards to desired 
number of elements.
+     */
+    public static BinaryTuplePrefix fromBinaryTuple(int numElements, 
BinaryTuple tuple) {
+        if (numElements == tuple.elementCount()) {
+            return entireTuple(tuple);
+        } else if (numElements > tuple.elementCount()) {
+            return expandTuple(numElements, tuple);
+        } else {
+            return truncateTuple(numElements, tuple);
+        }
+    }
+
     /**
      * Creates a prefix that contains all columns from the provided {@link 
BinaryTuple}.
      *
@@ -58,6 +78,10 @@ public class BinaryTuplePrefix extends BinaryTupleReader 
implements InternalTupl
      * @return Prefix, equivalent to the tuple.
      */
     public static BinaryTuplePrefix fromBinaryTuple(BinaryTuple tuple) {
+        return entireTuple(tuple);
+    }
+
+    private static BinaryTuplePrefix entireTuple(BinaryTuple tuple) {
         ByteBuffer tupleBuffer = tuple.byteBuffer();
 
         ByteBuffer prefixBuffer = ByteBuffer.allocate(tupleBuffer.remaining() 
+ Integer.BYTES)
@@ -73,6 +97,84 @@ public class BinaryTuplePrefix extends BinaryTupleReader 
implements InternalTupl
         return new BinaryTuplePrefix(tuple.elementCount(), prefixBuffer);
     }
 
+    private static BinaryTuplePrefix expandTuple(int numElements, BinaryTuple 
tuple) {
+        assert numElements > tuple.elementCount();
+
+        if (tuple.elementCount() == 0) {
+            return new BinaryTuplePrefix(
+                    numElements, new BinaryTuplePrefixBuilder(0, numElements, 
0).build()
+            );
+        }
+
+        int[] dataBeginOffsetHolder = new int[1];
+        int[] dataEndOffsetHolder = new int[1];
+
+        tuple.fetch(0, (index, begin, end) -> dataBeginOffsetHolder[0] = 
begin);
+        tuple.fetch(tuple.elementCount() - 1, (index, begin, end) -> 
dataEndOffsetHolder[0] = end);
+
+        ByteBuffer tupleBuffer = tuple.byteBuffer();
+
+        byte flags = tupleBuffer.get(0);
+        int entrySize = BinaryTupleCommon.flagsToEntrySize(flags);
+
+        int newTupleSize = tupleBuffer.remaining() // size of original tuple
+                // additional space for the offset map to align it with 
desired number of columns
+                + (entrySize * (numElements - tuple.elementCount()))
+                + Integer.BYTES; // actual number of columns in prefix
+
+        ByteBuffer prefixBuffer = ByteBuffer.allocate(newTupleSize)
+                .order(ORDER)
+                .put(tupleBuffer.duplicate().limit(dataBeginOffsetHolder[0])); 
// header
+
+        int payloadEndPosition = dataEndOffsetHolder[0] - 
dataBeginOffsetHolder[0];
+        for (int idx = tuple.elementCount(); idx < numElements; idx++) {
+            switch (entrySize) {
+                case Byte.BYTES:
+                    prefixBuffer.put((byte) payloadEndPosition);
+                    break;
+                case Short.BYTES:
+                    prefixBuffer.putShort((short) payloadEndPosition);
+                    break;
+                case Integer.BYTES:
+                    prefixBuffer.putInt(payloadEndPosition);
+                    break;
+                default:
+                    assert false;
+            }
+        }
+
+        prefixBuffer
+                
.put(tupleBuffer.slice().position(dataBeginOffsetHolder[0]).limit(dataEndOffsetHolder[0]))
 // payload
+                .putInt(tuple.elementCount())
+                .flip();
+
+        prefixBuffer.put(0, (byte) (flags | PREFIX_FLAG));
+
+        return new BinaryTuplePrefix(numElements, prefixBuffer);
+    }
+
+    private static BinaryTuplePrefix truncateTuple(int numElements, 
BinaryTuple tuple) {
+        assert numElements < tuple.elementCount();
+
+        int[] dataBeginOffsetHolder = new int[1];
+        int[] dataEndOffsetHolder = new int[1];
+
+        tuple.fetch(0, (index, begin, end) -> dataBeginOffsetHolder[0] = 
begin);
+        tuple.fetch(numElements - 1, (index, begin, end) -> 
dataEndOffsetHolder[0] = end);
+
+        BinaryTuplePrefixBuilder builder = new BinaryTuplePrefixBuilder(
+                numElements, numElements, dataEndOffsetHolder[0] - 
dataBeginOffsetHolder[0]
+        );
+
+        for (int i = 0; i < numElements; i++) {
+            byte[] valueBytes = tuple.bytesValue(i);
+
+            builder.appendBytes(valueBytes);
+        }
+
+        return new BinaryTuplePrefix(numElements, builder.build());
+    }
+
     @Override
     public int elementCount() {
         ByteBuffer buffer = byteBuffer();
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
index 99acfc8770..3c9815e140 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTuplePrefixTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.schema;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -25,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.time.LocalDate;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
 import org.junit.jupiter.api.Test;
@@ -76,6 +78,81 @@ public class BinaryTuplePrefixTest {
         assertThat(prefix.intValue(0), is(Integer.MAX_VALUE));
     }
 
+    @Test
+    public void testCreatePrefixFromBinaryTupleWhichSizeIsLessThanRequired() {
+        int sourceTupleSize = 1;
+
+        ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize)
+                .appendInt(10)
+                .build();
+
+        BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(4, new 
BinaryTuple(sourceTupleSize, buffer));
+
+        assertThat(prefix.elementCount(), equalTo(sourceTupleSize));
+        assertThat(prefix.intValue(0), equalTo(10));
+        assertThat(prefix.hasNullValue(1), equalTo(true));
+        assertThat(prefix.hasNullValue(2), equalTo(true));
+        assertThat(prefix.hasNullValue(3), equalTo(true));
+    }
+
+    @Test
+    public void testCreatePrefixFromBinaryTupleWhichSizeIsEqualToRequired() {
+        int sourceTupleSize = 4;
+
+        ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize)
+                .appendInt(10)
+                .appendString("foo")
+                .appendNull()
+                .appendBoolean(false)
+                .build();
+
+        BinaryTuplePrefix prefix = 
BinaryTuplePrefix.fromBinaryTuple(sourceTupleSize, new 
BinaryTuple(sourceTupleSize, buffer));
+
+        assertThat(prefix.elementCount(), equalTo(sourceTupleSize));
+        assertThat(prefix.intValue(0), equalTo(10));
+        assertThat(prefix.stringValue(1), equalTo("foo"));
+        assertThat(prefix.hasNullValue(2), equalTo(true));
+        assertThat(prefix.booleanValue(3), equalTo(false));
+    }
+
+    @Test
+    public void 
testCreatePrefixFromBinaryTupleWhichSizeIsGreaterThanRequired() {
+        int sourceTupleSize = 5;
+
+        ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize)
+                .appendInt(10)
+                .appendString("foo")
+                .appendNull()
+                .appendBoolean(false)
+                .appendString("truncated value")
+                .build();
+
+        int prefixSize = 4;
+
+        BinaryTuplePrefix prefix = 
BinaryTuplePrefix.fromBinaryTuple(prefixSize, new BinaryTuple(sourceTupleSize, 
buffer));
+
+        assertThat(prefix.elementCount(), equalTo(prefixSize));
+        assertThat(prefix.intValue(0), equalTo(10));
+        assertThat(prefix.stringValue(1), equalTo("foo"));
+        assertThat(prefix.hasNullValue(2), equalTo(true));
+        assertThat(prefix.booleanValue(3), equalTo(false));
+    }
+
+    @Test
+    public void testCreatePrefixFromZeroLengthBinaryTuple() {
+        int sourceTupleSize = 0;
+
+        ByteBuffer buffer = new BinaryTupleBuilder(sourceTupleSize).build();
+
+        BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(4, new 
BinaryTuple(sourceTupleSize, buffer));
+
+        assertThat(prefix.elementCount(), equalTo(sourceTupleSize));
+        assertThat(prefix.hasNullValue(0), equalTo(true));
+        assertThat(prefix.hasNullValue(1), equalTo(true));
+        assertThat(prefix.hasNullValue(2), equalTo(true));
+        assertThat(prefix.hasNullValue(3), equalTo(true));
+    }
+
     /**
      * Tests construction of an invalid prefix.
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index 4211794044..db7ef9849f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -48,7 +48,7 @@ public interface ExchangeService extends LifecycleAware {
      *      which completes when the data has been sent.
      */
     CompletableFuture<Void> sendBatch(String nodeName, UUID queryId, long 
fragmentId, long exchangeId, int batchId, boolean last,
-            List<ByteBuffer> rows);
+            List<BinaryTupleMessage> rows);
 
     /**
      * Asynchronously requests data from the specified node.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index a9f390ef94..859670c7b9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -36,6 +35,7 @@ import 
org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.TraceableException;
 import org.jetbrains.annotations.Nullable;
@@ -76,7 +76,7 @@ public class ExchangeServiceImpl implements ExchangeService {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> sendBatch(String nodeName, UUID qryId, long 
fragmentId, long exchangeId, int batchId,
-            boolean last, List<ByteBuffer> rows) {
+            boolean last, List<BinaryTupleMessage> rows) {
 
         return messageService.send(
                 nodeName,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 8120ef2ea2..c08a184545 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -84,7 +84,7 @@ public class ExecutableTableRegistryImpl implements 
ExecutableTableRegistry {
                     );
 
                     InternalTable internalTable = table.internalTable();
-                    ScannableTable scannableTable = new 
ScannableTableImpl(internalTable, converterFactory, tableDescriptor);
+                    ScannableTable scannableTable = new 
ScannableTableImpl(internalTable, converterFactory);
 
                     UpdatableTableImpl updatableTable = new 
UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
                             replicaService, clock, 
converterFactory.create(null), schemaDescriptor);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowConverter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowConverter.java
deleted file mode 100644
index 60fd46c3f6..0000000000
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.exec;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
-import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
-import org.apache.ignite.internal.schema.BinaryRowConverter;
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
-import org.apache.ignite.internal.type.NativeTypeSpec;
-
-/**
- * Helper class provides method to convert binary tuple to rows and vice-versa.
- */
-public final class RowConverter {
-    /**
-     * Creates binary tuple schema for index rows.
-     */
-    public static BinaryTupleSchema createIndexRowSchema(List<String> 
indexedColumns, TableDescriptor tableDescriptor) {
-        Element[] elements = indexedColumns.stream()
-                .map(tableDescriptor::columnDescriptor)
-                .map(colDesc -> new Element(colDesc.physicalType(), true))
-                .toArray(Element[]::new);
-
-        return BinaryTupleSchema.create(elements);
-    }
-
-    /**
-     * Converts a search row, which represents prefix condition, to a binary 
tuple.
-     *
-     * @param ectx Execution context.
-     * @param binarySchema Binary tuple schema.
-     * @param factory Row handler factory.
-     * @param searchRow Search row.
-     * @param <RowT> Row type.
-     * @return Binary tuple.
-     */
-    static <RowT> BinaryTuplePrefix toBinaryTuplePrefix(
-            ExecutionContext<RowT> ectx,
-            BinaryTupleSchema binarySchema,
-            RowHandler.RowFactory<RowT> factory,
-            RowT searchRow
-    ) {
-        RowHandler<RowT> handler = factory.handler();
-
-        int indexedColumnsCount = binarySchema.elementCount();
-        int prefixColumnsCount = handler.columnCount(searchRow);
-
-        assert binarySchema.elementCount() >= prefixColumnsCount : "Invalid 
range condition";
-
-        BinaryTuplePrefixBuilder tupleBuilder = new 
BinaryTuplePrefixBuilder(prefixColumnsCount, indexedColumnsCount);
-
-        return new BinaryTuplePrefix(indexedColumnsCount, 
toByteBuffer(binarySchema, handler, tupleBuilder, searchRow));
-    }
-
-    /**
-     * Converts a search row, which represents exact value condition, to a 
binary tuple.
-     *
-     * @param ectx Execution context.
-     * @param binarySchema Binary tuple schema.
-     * @param factory Row handler factory.
-     * @param searchRow Search row.
-     * @param <RowT> Row type.
-     * @return Binary tuple.
-     */
-    public static <RowT> BinaryTuple toBinaryTuple(
-            ExecutionContext<RowT> ectx,
-            BinaryTupleSchema binarySchema,
-            RowHandler.RowFactory<RowT> factory,
-            RowT searchRow
-    ) {
-        RowHandler<RowT> handler = factory.handler();
-
-        int rowColumnsCount = handler.columnCount(searchRow);
-
-        assert rowColumnsCount == binarySchema.elementCount() : "Invalid 
lookup key.";
-
-        BinaryTupleBuilder tupleBuilder = new 
BinaryTupleBuilder(rowColumnsCount);
-
-        return new BinaryTuple(rowColumnsCount, toByteBuffer(binarySchema, 
handler, tupleBuilder, searchRow));
-    }
-
-    private static <RowT> ByteBuffer toByteBuffer(
-            BinaryTupleSchema binarySchema,
-            RowHandler<RowT> handler,
-            BinaryTupleBuilder tupleBuilder,
-            RowT searchRow
-    ) {
-        int columnsCount = handler.columnCount(searchRow);
-
-        for (int i = 0; i < columnsCount; i++) {
-            Object val = handler.get(i, searchRow);
-
-            Element element = binarySchema.element(i);
-
-            val = TypeUtils.fromInternal(val, 
NativeTypeSpec.toClass(element.typeSpec(), element.nullable()));
-
-            BinaryRowConverter.appendValue(tupleBuilder, element, val);
-        }
-
-        return tupleBuilder.build();
-    }
-}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
index 302be058c9..8b35056635 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.lang.InternalTuple;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.jetbrains.annotations.Nullable;
 
@@ -64,12 +64,12 @@ public interface RowHandler<RowT> {
     int columnCount(RowT row);
 
     /**
-     * Assembly row representation as ByteBuffer.
+     * Assembly row representation as BinaryTuple.
      *
      * @param row Incoming data to be processed.
-     * @return {@link ByteBuffer} representation.
+     * @return {@link BinaryTuple} representation.
      */
-    ByteBuffer toByteBuffer(RowT row);
+    BinaryTuple toBinaryTuple(RowT row);
 
     /** String representation. */
     String toString(RowT row);
@@ -96,14 +96,6 @@ public interface RowHandler<RowT> {
          */
         RowT create(Object... fields);
 
-        /**
-         * Create row using incoming {@link ByteBuffer}.
-         *
-         * @param raw {@link ByteBuffer} representation.
-         * @return Instantiation defined representation.
-         */
-        RowT create(ByteBuffer raw);
-
         /**
          * Create row using incoming binary tuple.
          *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index cfe637fa27..e0477f4515 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
 
@@ -27,10 +28,8 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.util.subscription.TransformingPublisher;
 import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -45,13 +44,10 @@ public class ScannableTableImpl implements ScannableTable {
 
     private final TableRowConverterFactory converterFactory;
 
-    private final TableDescriptor tableDescriptor;
-
     /** Constructor. */
-    public ScannableTableImpl(InternalTable internalTable, 
TableRowConverterFactory converterFactory, TableDescriptor tableDescriptor) {
+    public ScannableTableImpl(InternalTable internalTable, 
TableRowConverterFactory converterFactory) {
         this.internalTable = internalTable;
         this.converterFactory = converterFactory;
-        this.tableDescriptor = tableDescriptor;
     }
 
     /** {@inheritDoc} */
@@ -90,9 +86,8 @@ public class ScannableTableImpl implements ScannableTable {
             @Nullable RangeCondition<RowT> cond,
             @Nullable BitSet requiredColumns
     ) {
-
-        BinaryTupleSchema indexRowSchema = 
RowConverter.createIndexRowSchema(columns, tableDescriptor);
         TxAttributes txAttributes = ctx.txAttributes();
+        RowHandler<RowT> handler = rowFactory.handler();
 
         Publisher<BinaryRow> pub;
         BinaryTuplePrefix lower;
@@ -105,17 +100,21 @@ public class ScannableTableImpl implements ScannableTable 
{
             lower = null;
             upper = null;
         } else {
-            lower = toBinaryTuplePrefix(ctx, indexRowSchema, cond.lower(), 
rowFactory);
-            upper = toBinaryTuplePrefix(ctx, indexRowSchema, cond.upper(), 
rowFactory);
+            lower = toBinaryTuplePrefix(columns.size(), handler, cond.lower());
+            upper = toBinaryTuplePrefix(columns.size(), handler, cond.upper());
 
             flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : 0;
             flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : 0;
         }
 
         if (txAttributes.readOnly()) {
+            HybridTimestamp readTime = txAttributes.time();
+
+            assert readTime != null;
+
             pub = internalTable.scan(
                     partWithTerm.partId(),
-                    txAttributes.time(),
+                    readTime,
                     ctx.localNode(),
                     indexId,
                     lower,
@@ -148,24 +147,31 @@ public class ScannableTableImpl implements ScannableTable 
{
             PartitionWithTerm partWithTerm,
             RowFactory<RowT> rowFactory,
             int indexId,
-            List<String> columns, RowT key,
+            List<String> columns,
+            RowT key,
             @Nullable BitSet requiredColumns
     ) {
-
-        BinaryTupleSchema indexRowSchema = 
RowConverter.createIndexRowSchema(columns, tableDescriptor);
         TxAttributes txAttributes = ctx.txAttributes();
+        RowHandler<RowT> handler = rowFactory.handler();
         Publisher<BinaryRow> pub;
 
-        BinaryTuple keyTuple = toBinaryTuple(ctx, indexRowSchema, key, 
rowFactory);
+        BinaryTuple keyTuple = handler.toBinaryTuple(key);
+
+        assert keyTuple.elementCount() == columns.size()
+                : format("Key should contain exactly {} fields, but was {}", 
columns.size(), handler.toString(key));
 
         if (txAttributes.readOnly()) {
+            HybridTimestamp readTime = txAttributes.time();
+
+            assert readTime != null;
+
             pub = internalTable.lookup(
                     partWithTerm.partId(),
-                    txAttributes.time(),
+                    readTime,
                     ctx.localNode(),
                     indexId,
                     keyTuple,
-                    requiredColumns
+                    null
             );
         } else {
             pub = internalTable.lookup(
@@ -174,7 +180,7 @@ public class ScannableTableImpl implements ScannableTable {
                     new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
                     indexId,
                     keyTuple,
-                    requiredColumns
+                    null
             );
         }
 
@@ -184,25 +190,16 @@ public class ScannableTableImpl implements ScannableTable 
{
     }
 
     private static <RowT> @Nullable BinaryTuplePrefix toBinaryTuplePrefix(
-            ExecutionContext<RowT> ctx,
-            BinaryTupleSchema indexRowSchema,
-            @Nullable RowT condition,
-            RowFactory<RowT> factory
+            int searchBoundSize,
+            RowHandler<RowT> handler,
+            @Nullable RowT prefix
     ) {
-        if (condition == null) {
+        if (prefix == null || handler.columnCount(prefix) == 0) {
             return null;
         }
 
-        return RowConverter.toBinaryTuplePrefix(ctx, indexRowSchema, factory, 
condition);
-    }
+        assert searchBoundSize >= handler.columnCount(prefix) : "Invalid range 
condition";
 
-    private static <RowT> @Nullable BinaryTuple 
toBinaryTuple(ExecutionContext<RowT> ctx, BinaryTupleSchema indexRowSchema,
-            @Nullable RowT condition, RowFactory<RowT> factory) {
-        if (condition == null) {
-            return null;
-        }
-
-        return RowConverter.toBinaryTuple(ctx, indexRowSchema, factory, 
condition);
+        return BinaryTuplePrefix.fromBinaryTuple(searchBoundSize, 
handler.toBinaryTuple(prefix));
     }
-
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
index 4551d3c232..5ac300e8c1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -29,6 +28,7 @@ import java.time.LocalTime;
 import java.util.BitSet;
 import java.util.List;
 import java.util.UUID;
+import org.apache.calcite.avatica.util.ByteString;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.lang.InternalTuple;
@@ -55,12 +55,11 @@ import org.jetbrains.annotations.Nullable;
  * </ul>
  *
  * <p>Each kind of rows is serialized to the same binary tuple format
- * using the {@link #toByteBuffer(RowWrapper) toByteBuffer} method.
+ * using the {@link #toBinaryTuple(RowWrapper)} method.
  *
- * <p>Factory methods {@link RowFactory#create(InternalTuple) 
wrap(InternalTuple)} and
- * {@link RowFactory#create(ByteBuffer) create(ByteBuffer)} allow create rows 
without
- * any additional conversions. But the fields in binary tuple must match the
- * factory {@link RowSchema row schema}.
+ * <p>Factory method {@link RowFactory#create(InternalTuple)} allows to
+ * create rows without any additional conversions. But the fields in
+ * binary tuple must match the factory {@link RowSchema row schema}.
  */
 public class SqlRowHandler implements RowHandler<RowWrapper> {
     public static final RowHandler<RowWrapper> INSTANCE = new SqlRowHandler();
@@ -138,8 +137,8 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
     }
 
     @Override
-    public ByteBuffer toByteBuffer(RowWrapper row) {
-        return row.toByteBuffer();
+    public BinaryTuple toBinaryTuple(RowWrapper row) {
+        return row.toBinaryTuple();
     }
 
     @Override
@@ -167,12 +166,6 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
                 return new ObjectsArrayRowWrapper(rowSchema, fields);
             }
 
-            /** {@inheritDoc} */
-            @Override
-            public RowWrapper create(ByteBuffer buf) {
-                return create(new BinaryTuple(schemaLen, buf));
-            }
-
             /** {@inheritDoc} */
             @Override
             public RowWrapper create(InternalTuple tuple) {
@@ -193,9 +186,9 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
 
         abstract @Nullable Object get(int field);
 
-        abstract void set(int field, Object value);
+        abstract void set(int field, @Nullable Object value);
 
-        abstract ByteBuffer toByteBuffer();
+        abstract BinaryTuple toBinaryTuple();
     }
 
     /**
@@ -226,8 +219,45 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
         }
 
         @Override
-        ByteBuffer toByteBuffer() {
-            BinaryTupleBuilder tupleBuilder = new 
BinaryTupleBuilder(row.length);
+        BinaryTuple toBinaryTuple() {
+            int estimatedSize = 0;
+            boolean exactEstimate = true;
+            for (int i = 0; i < row.length; i++) {
+                NativeType nativeType = 
RowSchemaTypes.toNativeType(rowSchema.fields().get(i));
+
+                if (nativeType == null) {
+                    assert row[i] == null;
+
+                    continue;
+                }
+
+                Object value = row[i];
+
+                if (value == null) {
+                    continue;
+                }
+
+                if (nativeType.spec().fixedLength()) {
+                    estimatedSize += nativeType.sizeInBytes();
+                } else {
+                    if (value instanceof String) {
+                        // every character in the string may contain up to 4 
bytes.
+                        // Let's be optimistic here and reserve buffer only 
for the smallest
+                        // possible variant
+
+                        estimatedSize += ((String) value).length();
+                        exactEstimate = false;
+                    } else if (value instanceof ByteString) {
+                        estimatedSize += ((ByteString) value).length();
+                    } else {
+                        assert (value instanceof BigDecimal) || (value 
instanceof BigInteger) : "unexpected value " + value.getClass();
+
+                        exactEstimate = false;
+                    }
+                }
+            }
+
+            BinaryTupleBuilder tupleBuilder = new 
BinaryTupleBuilder(row.length, estimatedSize, exactEstimate);
 
             for (int i = 0; i < row.length; i++) {
                 Object value = row[i];
@@ -235,7 +265,7 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
                 appendValue(tupleBuilder, rowSchema.fields().get(i), value);
             }
 
-            return tupleBuilder.build();
+            return new BinaryTuple(row.length, tupleBuilder.build());
         }
 
         @Override
@@ -243,7 +273,7 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
             return rowSchema;
         }
 
-        private void appendValue(BinaryTupleBuilder builder, TypeSpec 
schemaType, @Nullable Object value) {
+        private static void appendValue(BinaryTupleBuilder builder, TypeSpec 
schemaType, @Nullable Object value) {
             if (value == null) {
                 builder.appendNull();
 
@@ -374,8 +404,12 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
         }
 
         @Override
-        ByteBuffer toByteBuffer() {
-            return tuple.byteBuffer();
+        BinaryTuple toBinaryTuple() {
+            if (tuple instanceof BinaryTuple) {
+                return (BinaryTuple) tuple;
+            }
+
+            return new BinaryTuple(tuple.elementCount(), tuple.byteBuffer());
         }
 
         @Override
@@ -383,7 +417,7 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
             return rowSchema;
         }
 
-        private @Nullable Object readValue(InternalTuple tuple, NativeType 
nativeType, int fieldIndex) {
+        private static @Nullable Object readValue(InternalTuple tuple, 
NativeType nativeType, int fieldIndex) {
             switch (nativeType.spec()) {
                 case BOOLEAN: return tuple.booleanValueBoxed(fieldIndex);
                 case INT8: return tuple.byteValueBoxed(fieldIndex);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index ba70a02e78..c3fcb8928c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.calcite.util.Util.unexpected;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -39,6 +38,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.SharedState;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox.RemoteSource.State;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.jetbrains.annotations.Nullable;
@@ -163,15 +163,15 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
      * @param last Last batch flag.
      * @param rows Rows.
      */
-    public void onBatchReceived(String srcNodeName, int batchId, boolean last, 
List<ByteBuffer> rows) throws Exception {
+    public void onBatchReceived(String srcNodeName, int batchId, boolean last, 
List<BinaryTupleMessage> rows) throws Exception {
         RemoteSource<RowT> source = perNodeBuffers.get(srcNodeName);
 
         boolean waitingBefore = source.check() == State.WAITING;
 
         List<RowT> rows0 = new ArrayList<>(rows.size());
 
-        for (ByteBuffer row : rows) {
-            rows0.add(rowFactory.create(row));
+        for (BinaryTupleMessage row : rows) {
+            rows0.add(rowFactory.create(row.asBinaryTuple()));
         }
 
         source.onBatchReceived(batchId, last, rows0);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index fb6560c60f..0f6e387ab1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -32,6 +31,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
@@ -39,6 +39,8 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.SharedState;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.jetbrains.annotations.Nullable;
@@ -48,6 +50,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, 
SingleNode<RowT>, Downstream<RowT> {
     private static final IgniteLogger LOG = Loggers.forClass(Outbox.class);
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
 
     private final long exchangeId;
     private final long targetFragmentId;
@@ -236,10 +239,17 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
     private void sendBatch(String nodeName, int batchId, boolean last, 
List<RowT> rows) {
         RowHandler<RowT> handler = context().rowHandler();
 
-        List<ByteBuffer> rows0 = new ArrayList<>(rows.size());
+        List<BinaryTupleMessage> rows0 = new ArrayList<>(rows.size());
 
         for (RowT row : rows) {
-            rows0.add(handler.toByteBuffer(row));
+            BinaryTuple tuple = handler.toBinaryTuple(row);
+
+            rows0.add(
+                    TABLE_MESSAGES_FACTORY.binaryTupleMessage()
+                            .elementCount(tuple.elementCount())
+                            .tuple(tuple.byteBuffer())
+                            .build()
+            );
         }
 
         exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId, 
batchId, last, rows0)
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
index b1cc051270..7174aee040 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
-import java.nio.ByteBuffer;
 import java.util.List;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -45,5 +45,5 @@ public interface QueryBatchMessage extends 
ExecutionContextAwareMessage {
     /**
      * Get rows.
      */
-    List<ByteBuffer> rows();
+    List<BinaryTupleMessage> rows();
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 49947228b1..fb3a07f4b2 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
-import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Iterator;
@@ -56,7 +55,6 @@ import 
org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.NetworkAddress;
@@ -355,11 +353,6 @@ public abstract class AbstractExecutionTest<T> extends 
IgniteAbstractTest {
                 return fields;
             }
 
-            @Override
-            public Object[] create(ByteBuffer raw) {
-                return ByteUtils.fromBytes(raw.array());
-            }
-
             @Override
             public Object[] create(InternalTuple tuple) {
                 throw new UnsupportedOperationException();
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 5a1569f007..c2100b174b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.ClusterServiceFactory;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
@@ -62,6 +63,7 @@ import 
org.apache.ignite.internal.testframework.IgniteTestUtils;
 import 
org.apache.ignite.internal.testframework.IgniteTestUtils.PredicateMatcher;
 import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
 import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
+import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
@@ -93,9 +95,18 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest<Object[]> {
 
     private static final Map<String, QueryTaskExecutor> executors = new 
HashMap<>();
 
-    public static final CustomMatcher<Object[]> ODD_KEY_MATCHER = new 
PredicateMatcher<>(e -> ((int) (e[0])) % 2 != 0, "odd key");
+    private static final CustomMatcher<Object[]> ODD_KEY_MATCHER = new 
PredicateMatcher<>(e -> ((int) (e[0])) % 2 != 0, "odd key");
 
-    public static final CustomMatcher<Object[]> EVEN_KEY_MATCHER = new 
PredicateMatcher<>(e -> ((int) (e[0])) % 2 == 0, "even key");
+    private static final CustomMatcher<Object[]> EVEN_KEY_MATCHER = new 
PredicateMatcher<>(e -> ((int) (e[0])) % 2 == 0, "even key");
+
+    /**
+     * Schema of the rows used in the tests. All data providers created within 
this test class must
+     * conform to this row schema.
+     */
+    private static final RowSchema ROW_SCHEMA = RowSchema.builder()
+            .addField(NativeTypes.INT32)
+            .addField(NativeTypes.INT32)
+            .build();
 
     private final Map<String, MailboxRegistry> mailboxes = new HashMap<>();
     private final Map<String, ExchangeService> exchangeServices = new 
HashMap<>();
@@ -496,7 +507,7 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest<Object[]> {
                 createExchangeService(taskExecutor, 
serviceFactory.forNode(localNode.name()), mailboxRegistry));
 
         Inbox<Object[]> inbox = new Inbox<>(
-                targetCtx, exchangeService, mailboxRegistry, sourceNodeNames, 
comparator, rowFactory(),
+                targetCtx, exchangeService, mailboxRegistry, sourceNodeNames, 
comparator, rowHandler().factory(ROW_SCHEMA),
                 SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
         );
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 3efdb7697b..0681bed511 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -26,26 +26,22 @@ import static org.apache.calcite.rel.core.JoinRelType.LEFT;
 import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
 import static org.apache.calcite.rel.core.JoinRelType.SEMI;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
-import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Stream;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -127,30 +123,6 @@ public class ExecutionTest extends 
AbstractExecutionTest<Object[]> {
         assertArrayEquals(new Object[]{2, "Ivan", "Ignite"}, rows.get(1));
     }
 
-    @Test
-    public void testRowFactoryAssembly() {
-        ExecutionContext<Object[]> ctx = executionContext(false);
-
-        RelDataType rowType = TypeUtils.createRowType(ctx.getTypeFactory(),
-                TypeUtils.native2relationalTypes(ctx.getTypeFactory(), 
NativeTypes.INT32, NativeTypes.STRING, NativeTypes.BOOLEAN));
-
-        RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
-
-        RowFactory<Object[]> rowFactory = ctx.rowHandler().factory(rowSchema);
-
-        Object[] row1 = rowFactory.create();
-
-        ctx.rowHandler().set(0, row1, 1);
-        ctx.rowHandler().set(1, row1, "2");
-        ctx.rowHandler().set(2, row1, false);
-
-        ByteBuffer bb = ctx.rowHandler().toByteBuffer(row1);
-
-        Object[] row2 = rowFactory.create(bb);
-
-        assertArrayEquals(row1, row2);
-    }
-
     @Test
     public void testUnionAll() {
         ExecutionContext<Object[]> ctx = executionContext(true);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 201298c0c5..83f9a928cf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -69,9 +69,6 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
-import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.table.InternalTable;
@@ -491,7 +488,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
                     eq(tx.clusterNode()),
                     eq(indexId),
                     any(BinaryTuple.class),
-                    eq(tester.requiredFields)
+                    eq(null)
             );
         } else {
             PrimaryReplica primaryReplica = new 
PrimaryReplica(ctx.localNode(), term);
@@ -502,7 +499,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
                     eq(primaryReplica),
                     eq(indexId),
                     any(BinaryTuple.class),
-                    eq(tester.requiredFields)
+                    eq(null)
             );
         }
 
@@ -549,8 +546,6 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
 
     private class Tester {
 
-        final TableDescriptor tableDescriptor;
-
         final ScannableTable scannableTable;
 
         final TestInput input;
@@ -562,8 +557,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
         Tester(TestInput input) {
             this.input = input;
             rowConverter = new RowCollectingTableRwoConverter(input);
-            tableDescriptor = new 
TestTableDescriptor(IgniteDistributions::single, input.rowType);
-            scannableTable = new ScannableTableImpl(internalTable, rf -> 
rowConverter, tableDescriptor);
+            scannableTable = new ScannableTableImpl(internalTable, rf -> 
rowConverter);
         }
 
         ResultCollector tableScan(int partitionId, long term, NoOpTransaction 
tx) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 964c23726b..cd470cae7f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -52,9 +52,6 @@ import 
org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl;
 import org.apache.ignite.internal.sql.engine.exec.TableRowConverter;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
-import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -136,8 +133,7 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     return (RowT) TestInternalTableImpl.ROW;
                 }
             };
-            TableDescriptor descriptor = new 
TestTableDescriptor(IgniteDistributions::single, rowType);
-            ScannableTableImpl scanableTable = new 
ScannableTableImpl(internalTable, rf -> rowConverter, descriptor);
+            ScannableTableImpl scanableTable = new 
ScannableTableImpl(internalTable, rf -> rowConverter);
             TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, scanableTable,
                     partsWithTerms, null, null, null);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
index a876b91a74..ecf4fecbbf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/row/SqlRowHandlerTest.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.sql.engine.exec.row;
 
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.generateValueByType;
 import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.columnType2NativeType;
+import static org.apache.ignite.internal.sql.engine.util.TypeUtils.toInternal;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -73,25 +73,19 @@ public class SqlRowHandlerTest extends IgniteAbstractTest {
         int elementsCount = schema.fields().size();
 
         RowFactory<RowWrapper> factory = handler.factory(schema);
-        RowWrapper src = factory.create(sourceData);
+        RowWrapper src = factory.create(wrap(sourceData));
 
         // Serialization to binary tuple representation.
-        ByteBuffer buf = handler.toByteBuffer(src);
-
-        BinaryTuple tuple = new BinaryTuple(elementsCount, buf);
-        RowWrapper destWrap = factory.create(tuple);
-        RowWrapper dest = factory.create(buf);
+        BinaryTuple tuple = handler.toBinaryTuple(src);
+        RowWrapper dest = factory.create(tuple);
 
         for (int i = 0; i < elementsCount; i++) {
             String msg = schema.fields().get(i).toString();
 
-            assertThat(msg, handler.get(i, src), equalTo(sourceData[i]));
-
-            // Binary tuple wrapper must return data in internal format.
-            Object expected = TypeUtils.toInternal(sourceData[i]);
+            Object expected = toInternal(sourceData[i]);
 
+            assertThat(msg, handler.get(i, src), equalTo(expected));
             assertThat(msg, handler.get(i, dest), equalTo(expected));
-            assertThat(msg, handler.get(i, destWrap), equalTo(expected));
         }
     }
 
@@ -116,10 +110,10 @@ public class SqlRowHandlerTest extends IgniteAbstractTest 
{
         RowSchema concatenatedSchema = builder.build();
 
         // Serialize.
-        ByteBuffer buf = handler.toByteBuffer(concatenated);
+        BinaryTuple tuple = handler.toBinaryTuple(concatenated);
 
         // Wrap into row.
-        RowWrapper result = handler.factory(concatenatedSchema).create(new 
BinaryTuple(totalElementsCount, buf));
+        RowWrapper result = handler.factory(concatenatedSchema).create(tuple);
 
         for (int i = 0; i < leftLen; i++) {
             assertThat(handler.get(i, result), 
equalTo(TypeUtils.toInternal(params.leftData[i])));
@@ -149,13 +143,13 @@ public class SqlRowHandlerTest extends IgniteAbstractTest 
{
         RowFactory<RowWrapper> factory = handler.factory(schema);
 
         RowWrapper srcRow = factory.create(sourceData);
-        RowWrapper srcBinRow = factory.create(new 
BinaryTuple(handler.columnCount(srcRow), handler.toByteBuffer(srcRow)));
+        RowWrapper srcBinRow = factory.create(handler.toBinaryTuple(srcRow));
 
         RowWrapper mappedRow = handler.map(srcRow, mapping);
         RowWrapper mappedFromBinRow = handler.map(srcBinRow, mapping);
 
         RowSchema mappedSchema = rowSchema(columnTypes.subList(0, 
mapping.length), Arrays.copyOf(sourceData, mapping.length));
-        RowWrapper deserializedMappedBinRow = 
handler.factory(mappedSchema).create(handler.toByteBuffer(mappedFromBinRow));
+        RowWrapper deserializedMappedBinRow = 
handler.factory(mappedSchema).create(handler.toBinaryTuple(mappedFromBinRow));
 
         assertThat(handler.columnCount(mappedRow), equalTo(mapping.length));
         assertThat(handler.columnCount(mappedFromBinRow), 
equalTo(mapping.length));
@@ -213,6 +207,16 @@ public class SqlRowHandlerTest extends IgniteAbstractTest {
         return values;
     }
 
+    private static Object[] wrap(Object[] values) {
+        Object[] newValues = new Object[values.length];
+
+        for (int i = 0; i < values.length; i++) {
+            newValues[i] = toInternal(values[i]);
+        }
+
+        return newValues;
+    }
+
     private List<ColumnType> columnTypes() {
         List<ColumnType> columnTypes = new ArrayList<>(
                 // TODO Include ignored types to test after 
https://issues.apache.org/jira/browse/IGNITE-15200
@@ -244,11 +248,11 @@ public class SqlRowHandlerTest extends IgniteAbstractTest 
{
             RowFactory<RowWrapper> factory1 = handler.factory(leftSchema);
             RowFactory<RowWrapper> factory2 = handler.factory(rightSchema);
 
-            RowWrapper left = factory1.create(leftData);
-            RowWrapper right = factory2.create(rightData);
+            RowWrapper left = factory1.create(wrap(leftData));
+            RowWrapper right = factory2.create(wrap(rightData));
 
-            this.left = leftTupleRequired ? 
factory1.create(handler.toByteBuffer(left)) : left;
-            this.right = rightTupleRequired ? 
factory2.create(handler.toByteBuffer(right)) : right;
+            this.left = leftTupleRequired ? 
factory1.create(handler.toBinaryTuple(left)) : left;
+            this.right = rightTupleRequired ? 
factory2.create(handler.toBinaryTuple(right)) : right;
         }
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
index 21c9f69e9f..c80817c63b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ArrayRowHandler.java
@@ -17,13 +17,29 @@
 
 package org.apache.ignite.internal.sql.engine.framework;
 
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.lang.InternalTuple;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchemaTypes;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.DecimalNativeType;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Handler for rows that implemented as a simple objects array.
@@ -71,9 +87,14 @@ public class ArrayRowHandler implements RowHandler<Object[]> 
{
     }
 
     @Override
-    public ByteBuffer toByteBuffer(Object[] row) {
-        byte[] raw = ByteUtils.toBytes(row);
-        return ByteBuffer.wrap(raw);
+    public BinaryTuple toBinaryTuple(Object[] row) {
+        BinaryTupleBuilder tupleBuilder = new BinaryTupleBuilder(row.length);
+
+        for (Object value : row) {
+            appendValue(tupleBuilder, value);
+        }
+
+        return new BinaryTuple(row.length, tupleBuilder.build());
     }
 
     /** {@inheritDoc} */
@@ -108,17 +129,137 @@ public class ArrayRowHandler implements 
RowHandler<Object[]> {
                 return fields;
             }
 
-            /** {@inheritDoc} */
-            @Override
-            public Object[] create(ByteBuffer raw) {
-                return ByteUtils.fromBytes(raw.array());
-            }
-
             /** {@inheritDoc} */
             @Override
             public Object[] create(InternalTuple tuple) {
-                throw new UnsupportedOperationException();
+                Object[] row = new Object[tuple.elementCount()];
+
+                for (int i = 0; i < row.length; i++) {
+                    NativeType nativeType = 
RowSchemaTypes.toNativeType(rowSchema.fields().get(i));
+
+                    if (nativeType == null) {
+                        row[i] = null;
+
+                        continue;
+                    }
+
+                    row[i] = readValue(tuple, nativeType, i);
+                }
+
+                return row;
             }
         };
     }
+
+    private static void appendValue(BinaryTupleBuilder builder, @Nullable 
Object value) {
+        if (value == null) {
+            builder.appendNull();
+
+            return;
+        }
+
+        NativeType nativeType = NativeTypes.fromObject(value);
+
+        assert nativeType != null;
+
+        value = TypeUtils.fromInternal(value, 
NativeTypeSpec.toClass(nativeType.spec(), true));
+
+        assert value != null : nativeType;
+
+        switch (nativeType.spec()) {
+            case BOOLEAN:
+                builder.appendBoolean((boolean) value);
+                break;
+
+            case INT8:
+                builder.appendByte((byte) value);
+                break;
+
+            case INT16:
+                builder.appendShort((short) value);
+                break;
+
+            case INT32:
+                builder.appendInt((int) value);
+                break;
+
+            case INT64:
+                builder.appendLong((long) value);
+                break;
+
+            case FLOAT:
+                builder.appendFloat((float) value);
+                break;
+
+            case DOUBLE:
+                builder.appendDouble((double) value);
+                break;
+
+            case NUMBER:
+                builder.appendNumberNotNull((BigInteger) value);
+                break;
+
+            case DECIMAL:
+                builder.appendDecimalNotNull((BigDecimal) value, 
((DecimalNativeType) nativeType).scale());
+                break;
+
+            case UUID:
+                builder.appendUuidNotNull((UUID) value);
+                break;
+
+            case BYTES:
+                builder.appendBytesNotNull((byte[]) value);
+                break;
+
+            case STRING:
+                builder.appendStringNotNull((String) value);
+                break;
+
+            case BITMASK:
+                builder.appendBitmaskNotNull((BitSet) value);
+                break;
+
+            case DATE:
+                builder.appendDateNotNull((LocalDate) value);
+                break;
+
+            case TIME:
+                builder.appendTimeNotNull((LocalTime) value);
+                break;
+
+            case DATETIME:
+                builder.appendDateTimeNotNull((LocalDateTime) value);
+                break;
+
+            case TIMESTAMP:
+                builder.appendTimestampNotNull((Instant) value);
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Unknown type " + 
nativeType);
+        }
+    }
+
+    private static @Nullable Object readValue(InternalTuple tuple, NativeType 
nativeType, int fieldIndex) {
+        switch (nativeType.spec()) {
+            case BOOLEAN: return tuple.booleanValueBoxed(fieldIndex);
+            case INT8: return tuple.byteValueBoxed(fieldIndex);
+            case INT16: return tuple.shortValueBoxed(fieldIndex);
+            case INT32: return tuple.intValueBoxed(fieldIndex);
+            case INT64: return tuple.longValueBoxed(fieldIndex);
+            case FLOAT: return tuple.floatValueBoxed(fieldIndex);
+            case DOUBLE: return tuple.doubleValueBoxed(fieldIndex);
+            case DECIMAL: return tuple.decimalValue(fieldIndex, 
((DecimalNativeType) nativeType).scale());
+            case UUID: return tuple.uuidValue(fieldIndex);
+            case STRING: return tuple.stringValue(fieldIndex);
+            case BYTES: return tuple.bytesValue(fieldIndex);
+            case BITMASK: return tuple.bitmaskValue(fieldIndex);
+            case NUMBER: return tuple.numberValue(fieldIndex);
+            case DATE: return tuple.dateValue(fieldIndex);
+            case TIME: return tuple.timeValue(fieldIndex);
+            case DATETIME: return tuple.dateTimeValue(fieldIndex);
+            case TIMESTAMP: return tuple.timestampValue(fieldIndex);
+            default: throw new InvalidTypeException("Unknown element type: " + 
nativeType);
+        }
+    }
 }

Reply via email to