This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d80a17dfbb IGNITE-21797: Fix column's type mismatch during marshalling 
of pojo (embedded API) (#3466)
d80a17dfbb is described below

commit d80a17dfbb47d0b8ecf331b48a536c1d7b2c0a34
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed Mar 27 14:34:34 2024 +0200

    IGNITE-21797: Fix column's type mismatch during marshalling of pojo 
(embedded API) (#3466)
---
 .../ignite/internal/client/table/ClientSchema.java |   2 +-
 .../ignite/internal/compute/ItComputeBaseTest.java |  12 +
 .../ignite/internal/marshaller/Marshaller.java     |  57 ++-
 .../internal/marshaller/MarshallerColumn.java      |  23 +-
 .../marshaller/ReflectionMarshallersProvider.java  |  11 +-
 .../ReflectionMarshallersProviderSelfTest.java     |  74 +++-
 .../ignite/internal/schema/BinaryRowConverter.java |  17 +-
 .../ignite/internal/schema/BinaryTupleSchema.java  |  34 +-
 .../internal/schema/marshaller/KvMarshaller.java   |   9 +
 .../internal/schema/marshaller/MarshallerUtil.java |   1 +
 .../marshaller/reflection/KvMarshallerImpl.java    |  35 +-
 .../schema/marshaller/reflection/RowReader.java    |  98 +++--
 .../internal/schema/BinaryRowConverterTest.java    | 132 +++++++
 .../internal/schema/BinaryTupleSchemaTest.java     | 100 +++++-
 .../schema/marshaller/KvMarshallerTest.java        | 173 ++++++++-
 .../sql/api/ItKvKeyColumnPositionTest.java         | 395 +++++++++++++++++++++
 .../ignite/internal/table/KeyValueViewImpl.java    |   2 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   4 +-
 .../replication/PartitionReplicaListenerTest.java  |   2 +-
 19 files changed, 1095 insertions(+), 86 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
index 53f39ce3b1..e3c1edb647 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
@@ -268,7 +268,7 @@ public class ClientSchema {
     }
 
     private static MarshallerColumn marshallerColumn(ClientColumn col) {
-        return new MarshallerColumn(col.name(), mode(col.type()), null, 
col.scale());
+        return new MarshallerColumn(col.schemaIndex(), col.name(), 
mode(col.type()), null, col.scale());
     }
 
     private static BinaryMode mode(ColumnType dataType) {
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index c2352d7254..75beb2be9f 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -266,6 +266,18 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         assertThat(execution.cancelAsync(), willBe(false));
     }
 
+    @Test
+    public void executesColocatedWithNonConsecutiveKeyColumnOrder() {
+        sql("DROP TABLE IF EXISTS test");
+        sql("CREATE TABLE test (k int, key_int int, v int, key_str VARCHAR, 
CONSTRAINT PK PRIMARY KEY (key_int, key_str))");
+        sql("INSERT INTO test VALUES (1, 2, 3, '4')");
+
+        IgniteImpl entryNode = node(0);
+        String actualNodeName = entryNode.compute()
+                .executeColocated("test", Tuple.create(Map.of("key_int", 2, 
"key_str", "4")), units(), getNodeNameJobClassName());
+        assertThat(actualNodeName, in(allNodeNames()));
+    }
+
     @Test
     void executeColocatedThrowsTableNotFoundExceptionWhenTableDoesNotExist() {
         IgniteImpl entryNode = node(0);
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
index 558932de52..506b38a966 100644
--- 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.stream.IntStream;
 import org.apache.ignite.internal.marshaller.FieldAccessor.IdentityAccessor;
 import org.apache.ignite.internal.util.Factory;
 import org.apache.ignite.internal.util.ObjectFactory;
@@ -76,12 +75,12 @@ public abstract class Marshaller {
      * @return Marshaller.
      */
     private static SimpleMarshaller simpleMarshaller(MarshallerColumn[] cols, 
OneColumnMapper<?> mapper) {
-        int colIdx = findColumnIndex(cols, mapper.mappedColumn());
+        MarshallerColumn column = findColumnIndex(cols, mapper.mappedColumn());
 
-        return new SimpleMarshaller(createIdentityAccessor(cols[colIdx], 
colIdx, mapper.converter()));
+        return new SimpleMarshaller(createIdentityAccessor(column, 
column.schemaIndex(), mapper.converter()));
     }
 
-    private static int findColumnIndex(MarshallerColumn[] cols, @Nullable 
String name) {
+    private static MarshallerColumn findColumnIndex(MarshallerColumn[] cols, 
@Nullable String name) {
         if (name == null) {
             if (cols.length != 1) {
                 throw new IllegalArgumentException(String.format(
@@ -90,16 +89,19 @@ public abstract class Marshaller {
                 ));
             }
 
-            return 0;
+            return cols[0];
         }
 
-        return IntStream.range(0, cols.length)
-                .filter(i -> cols[i].name().equals(name))
-                .findFirst()
-                .orElseThrow(() -> new IllegalArgumentException(String.format(
-                        "Failed to map object to a single column: mappedColumn 
'%s' is not present in the schema",
-                        name
-                )));
+        for (MarshallerColumn column : cols) {
+            if (column.name().equals(name)) {
+                return column;
+            }
+        }
+
+        throw new IllegalArgumentException(String.format(
+                "Failed to map object to a single column: mappedColumn '%s' is 
not present in the schema",
+                name
+        ));
     }
 
     /**
@@ -122,7 +124,6 @@ public abstract class Marshaller {
         // Build handlers.
         for (int i = 0; i < cols.length; i++) {
             MarshallerColumn col = cols[i];
-
             String columnName = col.name();
 
             String fieldName = mapper.fieldForColumn(columnName);
@@ -190,6 +191,16 @@ public abstract class Marshaller {
      */
     public abstract void writeObject(@Nullable Object obj, MarshallerWriter 
writer) throws MarshallerException;
 
+    /**
+     * Write the specified field of an object to a row.
+     *
+     * @param obj Object.
+     * @param writer Row writer.
+     * @param fldIdx Field index.
+     * @throws MarshallerException If failed.
+     */
+    public abstract void writeField(@Nullable Object obj, MarshallerWriter 
writer, int fldIdx) throws MarshallerException;
+
     /**
      * Marshaller for objects of natively supported types.
      */
@@ -226,6 +237,14 @@ public abstract class Marshaller {
         public void writeObject(@Nullable Object obj, MarshallerWriter writer) 
throws MarshallerException {
             fieldAccessor.write(writer, obj);
         }
+
+        /** {@inheritDoc} */
+        @Override
+        public void writeField(Object obj, MarshallerWriter writer, int 
fldIdx) throws MarshallerException {
+            assert fldIdx == 0;
+
+            fieldAccessor.write(writer, obj);
+        }
     }
 
     /**
@@ -274,6 +293,12 @@ public abstract class Marshaller {
                 fieldAccessors[fldIdx].write(writer, obj);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override
+        public void writeField(@Nullable Object obj, MarshallerWriter writer, 
int fldIdx) throws MarshallerException {
+            fieldAccessors[fldIdx].write(writer, obj);
+        }
     }
 
     private static class NoOpMarshaller extends Marshaller {
@@ -282,6 +307,7 @@ public abstract class Marshaller {
             return null;
         }
 
+
         @Override
         public Object readObject(MarshallerReader reader, @Nullable Object 
target) {
             return null;
@@ -290,5 +316,10 @@ public abstract class Marshaller {
         @Override
         public void writeObject(Object obj, MarshallerWriter writer) {
         }
+
+        @Override
+        public void writeField(Object obj, MarshallerWriter writer, int 
fldIdx) throws MarshallerException {
+
+        }
     }
 }
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
index ffc90799cd..28faa4ce2e 100644
--- 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
@@ -21,6 +21,7 @@ import java.util.Objects;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Marshaller column.
@@ -29,6 +30,8 @@ public class MarshallerColumn {
     /** Default "default value supplier". */
     private static final Supplier<Object> NULL_SUPPLIER = () -> null;
 
+    private final int schemaIndex;
+
     /**
      * Column name.
      */
@@ -56,24 +59,36 @@ public class MarshallerColumn {
      * @param name      Column name.
      * @param type      An instance of column data type.
      */
+    @TestOnly
     public MarshallerColumn(String name, BinaryMode type) {
-        this(name, type, null, 0);
+        this.schemaIndex = -1;
+        this.name = name;
+        this.type = type;
+        this.defValSup = NULL_SUPPLIER;
+        this.scale = 0;
     }
 
     /**
      * Constructor.
      *
+     * @param schemaIndex Field's position in a schema, or -1,
      * @param name      Column name.
      * @param type      An instance of column data type.
      * @param defValSup Default value supplier.
+     * @param scale     Scale of a decimal type if binary mode is decimal, or 
zero otherwise.
      */
-    public MarshallerColumn(String name, BinaryMode type, @Nullable 
Supplier<Object> defValSup, int scale) {
+    public MarshallerColumn(int schemaIndex, String name, BinaryMode type, 
@Nullable Supplier<Object> defValSup, int scale) {
+        this.schemaIndex = schemaIndex;
         this.name = name;
         this.type = type;
         this.defValSup = defValSup == null ? NULL_SUPPLIER : defValSup;
         this.scale = scale;
     }
 
+    public int schemaIndex() {
+        return schemaIndex;
+    }
+
     public String name() {
         return name;
     }
@@ -102,12 +117,12 @@ public class MarshallerColumn {
             return false;
         }
         MarshallerColumn that = (MarshallerColumn) o;
-        return scale == that.scale && Objects.equals(name, that.name) && type 
== that.type;
+        return schemaIndex == that.schemaIndex && scale == that.scale && 
Objects.equals(name, that.name) && type == that.type;
     }
 
     @Override
     public int hashCode() {
         // See comment in equals method.
-        return Objects.hash(name, type, scale);
+        return Objects.hash(schemaIndex, name, type, scale);
     }
 }
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProvider.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProvider.java
index 1843888342..f569ff815a 100644
--- 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProvider.java
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProvider.java
@@ -54,7 +54,7 @@ public class ReflectionMarshallersProvider implements 
MarshallersProvider {
     ) {
 
         MarshallerCacheKey key = new MarshallerCacheKey(
-                schema.schemaVersion(), MarshallerType.KEY_ONLY, mapper, 
requireAllFields, allowUnmappedFields
+                schema.schemaVersion(), MarshallerType.KEY_ONLY, 
schema.keys(), mapper, requireAllFields, allowUnmappedFields
         );
 
         return marshallerCache.getOrAdd(key, k -> {
@@ -71,7 +71,7 @@ public class ReflectionMarshallersProvider implements 
MarshallersProvider {
             boolean allowUnmappedFields) {
 
         MarshallerCacheKey key = new MarshallerCacheKey(
-                schema.schemaVersion(), MarshallerType.VALUE_ONLY, mapper, 
requireAllFields, allowUnmappedFields
+                schema.schemaVersion(), MarshallerType.VALUE_ONLY, 
schema.values(), mapper, requireAllFields, allowUnmappedFields
         );
 
         return marshallerCache.getOrAdd(key, k -> {
@@ -89,7 +89,7 @@ public class ReflectionMarshallersProvider implements 
MarshallersProvider {
     ) {
 
         MarshallerCacheKey key = new MarshallerCacheKey(
-                schema.schemaVersion(), MarshallerType.FULL_ROW, mapper, 
requireAllFields, allowUnmappedFields
+                schema.schemaVersion(), MarshallerType.FULL_ROW, schema.row(), 
mapper, requireAllFields, allowUnmappedFields
         );
 
         return marshallerCache.getOrAdd(key, k -> {
@@ -135,8 +135,6 @@ public class ReflectionMarshallersProvider implements 
MarshallersProvider {
     }
 
     private static final class MarshallerCacheKey {
-        private static final MarshallerColumn[] NO_COLUMNS = new 
MarshallerColumn[0];
-
         private final int schemaVersion;
 
         private final Mapper<?> mapper;
@@ -152,12 +150,13 @@ public class ReflectionMarshallersProvider implements 
MarshallersProvider {
         MarshallerCacheKey(
                 int schemaVersion,
                 MarshallerType type,
+                MarshallerColumn[] columns,
                 Mapper<?> mapper,
                 boolean requireAllFields,
                 boolean allowUnmappedFields
         ) {
             this.schemaVersion = schemaVersion;
-            this.columns = NO_COLUMNS;
+            this.columns = columns;
             this.type = type;
             this.mapper = mapper;
             this.requireAllFields = requireAllFields;
diff --git 
a/modules/marshaller-common/src/test/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProviderSelfTest.java
 
b/modules/marshaller-common/src/test/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProviderSelfTest.java
index c0e931ced1..1922c81d54 100644
--- 
a/modules/marshaller-common/src/test/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProviderSelfTest.java
+++ 
b/modules/marshaller-common/src/test/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProviderSelfTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.stream.Stream;
 import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -37,10 +38,11 @@ public class ReflectionMarshallersProviderSelfTest {
     @EnumSource(MarshallerType.class)
     public void testMarshallerCache(MarshallerType marshallerType) {
         Mapper<TestPoJo> mapper = Mapper.of(TestPoJo.class);
+        List<String> fieldOrder = List.of("col1", "col2", "col3");
 
         // This test assumes that Mappers are cached.
 
-        TestMarshallerSchema schema1 = new MarshallerSchemaBuilder()
+        TestMarshallerSchema schema1 = new MarshallerSchemaBuilder(fieldOrder)
                 .version(1)
                 .addKey("col1", BinaryMode.INT)
                 .addValue("col2", BinaryMode.INT)
@@ -58,7 +60,7 @@ public class ReflectionMarshallersProviderSelfTest {
             assertNotSame(m2, m3);
         }
 
-        TestMarshallerSchema schema2 = new MarshallerSchemaBuilder()
+        TestMarshallerSchema schema2 = new MarshallerSchemaBuilder(fieldOrder)
                 .version(schema1.version + 1)
                 .addKey("col1", BinaryMode.INT)
                 .addValue("col2", BinaryMode.INT)
@@ -75,7 +77,7 @@ public class ReflectionMarshallersProviderSelfTest {
             assertNotSame(m1, m3);
         }
 
-        TestMarshallerSchema schema3 = new MarshallerSchemaBuilder()
+        TestMarshallerSchema schema3 = new MarshallerSchemaBuilder(fieldOrder)
                 .version(schema2.version + 1)
                 .addKey("col1", BinaryMode.INT)
                 .addValue("col2", BinaryMode.INT)
@@ -97,6 +99,60 @@ public class ReflectionMarshallersProviderSelfTest {
         }
     }
 
+    @Test
+    public void testKeyMarshallerCacheDiffOrder() {
+        Mapper<TestPoJo> mapper = Mapper.of(TestPoJo.class);
+        List<String> fieldOrder = List.of("col1", "col2", "col3", "col4");
+
+        TestMarshallerSchema schema1 = new MarshallerSchemaBuilder(fieldOrder)
+                .version(1)
+                .addKey("col1", BinaryMode.INT)
+                .addKey("col2", BinaryMode.INT)
+                .addValue("col3", BinaryMode.INT)
+                .addValue("col4", BinaryMode.INT)
+                .build();
+
+        TestMarshallerSchema schema2 = new 
MarshallerSchemaBuilder(List.of("col3", "col2", "col1", "col4"))
+                .version(1)
+                .addKey("col1", BinaryMode.INT)
+                .addKey("col2", BinaryMode.INT)
+                .addValue("col3", BinaryMode.INT)
+                .addValue("col4", BinaryMode.INT)
+                .build();
+
+        Marshaller m1 = MarshallerType.KEYS.get(marshallers, schema1, mapper, 
false, true);
+        Marshaller m2 = MarshallerType.KEYS.get(marshallers, schema2, mapper, 
false, true);
+
+        assertNotSame(m1, m2);
+    }
+
+    @Test
+    public void testValueMarshallerCacheDiffOrder() {
+        Mapper<TestPoJo> mapper = Mapper.of(TestPoJo.class);
+        List<String> fieldOrder = List.of("col1", "col2", "col3", "col4");
+
+        TestMarshallerSchema schema1 = new MarshallerSchemaBuilder(fieldOrder)
+                .version(1)
+                .addKey("col1", BinaryMode.INT)
+                .addKey("col2", BinaryMode.INT)
+                .addValue("col3", BinaryMode.INT)
+                .addValue("col4", BinaryMode.INT)
+                .build();
+
+        TestMarshallerSchema schema2 = new 
MarshallerSchemaBuilder(List.of("col1", "col4", "col3", "col3"))
+                .version(1)
+                .addKey("col1", BinaryMode.INT)
+                .addKey("col2", BinaryMode.INT)
+                .addValue("col3", BinaryMode.INT)
+                .addValue("col4", BinaryMode.INT)
+                .build();
+
+        Marshaller m1 = MarshallerType.KEYS.get(marshallers, schema1, mapper, 
false, true);
+        Marshaller m2 = MarshallerType.KEYS.get(marshallers, schema2, mapper, 
false, true);
+
+        assertNotSame(m1, m2);
+    }
+
     enum MarshallerType {
         /** Uses only key columns. */
         KEYS,
@@ -165,18 +221,26 @@ public class ReflectionMarshallersProviderSelfTest {
 
         private final List<MarshallerColumn> values = new ArrayList<>();
 
+        private final List<String> fieldOrder;
+
+        private MarshallerSchemaBuilder(List<String> fieldOrder) {
+            this.fieldOrder = fieldOrder;
+        }
+
         MarshallerSchemaBuilder version(int version) {
             this.version = version;
             return this;
         }
 
         MarshallerSchemaBuilder addKey(String name, BinaryMode binaryMode) {
-            keys.add(new MarshallerColumn(name.toUpperCase(Locale.US), 
binaryMode));
+            int schemaIndex = fieldOrder.indexOf(name);
+            keys.add(new MarshallerColumn(schemaIndex, 
name.toUpperCase(Locale.US), binaryMode, null, 0));
             return this;
         }
 
         MarshallerSchemaBuilder addValue(String name, BinaryMode binaryMode) {
-            values.add(new MarshallerColumn(name.toUpperCase(Locale.US), 
binaryMode));
+            int schemaIndex = fieldOrder.indexOf(name);
+            values.add(new MarshallerColumn(schemaIndex, 
name.toUpperCase(Locale.US), binaryMode, null, 0));
             return this;
         }
 
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
index f770f394c4..8915ad5d19 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.binarytuple.BinaryTupleParser;
 import org.apache.ignite.internal.binarytuple.BinaryTupleParser.Sink;
 import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Utility to convert {@link BinaryRow} to {@link BinaryTuple} with specified 
columns set.
@@ -154,18 +155,26 @@ public class BinaryRowConverter implements 
ColumnsExtractor {
         throw new InvalidTypeException("Unexpected type value: " + 
element.typeSpec());
     }
 
+    /**
+     * Returns destination tuple schema.
+     */
+    @TestOnly
+    public BinaryTupleSchema dstSchema() {
+        return dstSchema;
+    }
+
     /** Helper method to convert from a full row or key-only row to the 
key-only tuple. */
-    public static ColumnsExtractor keyExtractor(SchemaDescriptor schema) {
+    public static BinaryRowConverter keyExtractor(SchemaDescriptor schema) {
         BinaryTupleSchema rowSchema = 
BinaryTupleSchema.createRowSchema(schema);
-        BinaryTupleSchema keySchema = 
BinaryTupleSchema.createKeySchema(schema);
+        BinaryTupleSchema keySchema = 
BinaryTupleSchema.createDestinationKeySchema(schema);
 
         return new BinaryRowConverter(rowSchema, keySchema);
     }
 
     /** Helper method to convert from a full row or key-only row to the tuple 
with specified columns. */
-    public static ColumnsExtractor columnsExtractor(SchemaDescriptor schema, 
int... columns) {
-        BinaryTupleSchema trimmedSchema = 
BinaryTupleSchema.createSchema(schema, columns);
+    public static BinaryRowConverter columnsExtractor(SchemaDescriptor schema, 
int... columns) {
         BinaryTupleSchema rowSchema = 
BinaryTupleSchema.createRowSchema(schema);
+        BinaryTupleSchema trimmedSchema = 
BinaryTupleSchema.createSchema(schema, columns);
 
         return new BinaryRowConverter(rowSchema, trimmedSchema);
     }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
index 7485fe32ae..80b873b644 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
@@ -24,6 +24,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.BitSet;
+import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleFormatException;
@@ -194,7 +195,38 @@ public class BinaryTupleSchema {
      * @return Tuple schema.
      */
     public static BinaryTupleSchema createKeySchema(SchemaDescriptor 
descriptor) {
-        return createSchema(descriptor, 0, descriptor.keyColumns().size());
+        List<Column> columns = descriptor.keyColumns();
+        Element[] elements = new Element[columns.size()];
+
+        for (int i = 0; i < columns.size(); i++) {
+            Column column = columns.get(i);
+            elements[i] = new Element(column.type(), column.nullable());
+        }
+
+        // Key schema can be converted into a key-only tuple, so this schema 
should be have convertible = true
+        return new DenseRowSchema(elements, 0, true);
+    }
+
+    /**
+     * Creates a schema for binary tuples that should be used to place key 
columns into a row.
+     * Unlike {@link #createKeySchema(SchemaDescriptor)} this schema is not 
convertible, because
+     * key columns might be located at arbitrary positions and in 
non-consecutive manner.
+     *
+     * @param descriptor Row schema.
+     * @return Tuple schema.
+     */
+    public static BinaryTupleSchema 
createDestinationKeySchema(SchemaDescriptor descriptor) {
+        List<Column> columns = descriptor.keyColumns();
+        Element[] elements = new Element[columns.size()];
+        int[] positions = new int[columns.size()];
+
+        for (int i = 0; i < columns.size(); i++) {
+            Column column = columns.get(i);
+            elements[i] = new Element(column.type(), column.nullable());
+            positions[i] = column.positionInRow();
+        }
+
+        return new SparseRowSchema(elements, positions);
     }
 
     /**
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/KvMarshaller.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/KvMarshaller.java
index bb9224ff2a..2bfc30e94f 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/KvMarshaller.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/KvMarshaller.java
@@ -53,6 +53,15 @@ public interface KvMarshaller<K, V> {
      */
     Row marshal(K key, @Nullable V val) throws MarshallerException;
 
+    /**
+     * Unmarshal given key-only row to a key object.
+     *
+     * @param row Table row.
+     * @return Key object.
+     * @throws MarshallerException If failed to unmarshal row.
+     */
+    K unmarshalKeyOnly(Row row) throws MarshallerException;
+
     /**
      * Unmarshal given row to a key object.
      *
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
index 57472d68e0..85b15d09cf 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
@@ -139,6 +139,7 @@ public final class MarshallerUtil {
         NativeType columnType = column.type();
 
         return new MarshallerColumn(
+                column.positionInRow(),
                 column.name(),
                 mode(columnType),
                 column.defaultValueProvider()::get,
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
index c3a1f1922f..4435faee43 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
+import java.util.List;
 import org.apache.ignite.internal.marshaller.Marshaller;
 import org.apache.ignite.internal.marshaller.MarshallerException;
 import org.apache.ignite.internal.marshaller.MarshallerSchema;
@@ -51,6 +52,12 @@ public class KvMarshallerImpl<K, V> implements 
KvMarshaller<K, V> {
     /** Value type. */
     private final Class<V> valClass;
 
+    /** Positions of key fields in the schema. */
+    private final int[] keyPositions;
+
+    /** Positions of value fields in the schema. */
+    private final int[] valPositions;
+
     /**
      * Creates KV marshaller.
      *
@@ -68,6 +75,8 @@ public class KvMarshallerImpl<K, V> implements 
KvMarshaller<K, V> {
         MarshallerSchema marshallerSchema = schema.marshallerSchema();
         keyMarsh = marshallers.getKeysMarshaller(marshallerSchema, keyMapper, 
true, false);
         valMarsh = marshallers.getValuesMarshaller(marshallerSchema, 
valueMapper, true, false);
+        keyPositions = 
schema.keyColumns().stream().mapToInt(Column::positionInRow).toArray();
+        valPositions = 
schema.valueColumns().stream().mapToInt(Column::positionInRow).toArray();
     }
 
     /** {@inheritDoc} */
@@ -94,19 +103,27 @@ public class KvMarshallerImpl<K, V> implements 
KvMarshaller<K, V> {
         assert keyClass.isInstance(key);
         assert val == null || valClass.isInstance(val);
 
+        List<Column> columns = schema.columns();
         RowAssembler asm = createAssembler(key, val);
 
         var writer = new RowWriter(asm);
 
-        keyMarsh.writeObject(key, writer);
-        valMarsh.writeObject(val, writer);
+        for (Column column : columns) {
+            if (column.positionInKey() >= 0) {
+                keyMarsh.writeField(key, writer, column.positionInKey());
+            } else {
+                valMarsh.writeField(val, writer, column.positionInValue());
+            }
+        }
 
         return Row.wrapBinaryRow(schema, asm.build());
     }
 
     /** {@inheritDoc} */
     @Override
-    public K unmarshalKey(Row row) throws MarshallerException {
+    public K unmarshalKeyOnly(Row row) throws MarshallerException {
+        assert row.elementCount() == keyPositions.length : "Number of key 
columns does not match";
+
         Object o = keyMarsh.readObject(new RowReader(row), null);
 
         assert keyClass.isInstance(o);
@@ -114,11 +131,21 @@ public class KvMarshallerImpl<K, V> implements 
KvMarshaller<K, V> {
         return (K) o;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public K unmarshalKey(Row row) throws MarshallerException {
+        Object o = keyMarsh.readObject(new RowReader(row, keyPositions), null);
+
+        assert keyClass.isInstance(o);
+
+        return (K) o;
+    }
+
     /** {@inheritDoc} */
     @Nullable
     @Override
     public V unmarshalValue(Row row) throws MarshallerException {
-        Object o = valMarsh.readObject(new RowReader(row, 
schema.keyColumns().size()), null);
+        Object o = valMarsh.readObject(new RowReader(row, valPositions), null);
 
         assert o == null || valClass.isInstance(o);
 
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RowReader.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RowReader.java
index ce8be43be3..ccd460679a 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RowReader.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RowReader.java
@@ -27,6 +27,7 @@ import java.util.BitSet;
 import java.util.UUID;
 import org.apache.ignite.internal.marshaller.MarshallerReader;
 import org.apache.ignite.internal.schema.row.Row;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Adapter from a {@link Row} to a {@link MarshallerReader}.
@@ -36,13 +37,27 @@ class RowReader implements MarshallerReader {
 
     private int index;
 
+    private final int @Nullable[] positions;
+
+    /**
+     * Constructor for a reader that reads row fields in consecutive order.
+     *
+     * @param row Row.
+     */
     RowReader(Row row) {
-        this(row, 0);
+        this(row, null);
     }
 
-    RowReader(Row row, int index) {
+    /**
+     * Constructor for a reader that can read row fields in an order specified 
by positions array. If positions array is not specified
+     * then this reader reads fields in consecutive order.
+     *
+     * @param row Row.
+     * @param positions Positions array that defines row field read order.
+     */
+    RowReader(Row row, int @Nullable[] positions) {
         this.row = row;
-        this.index = index;
+        this.positions = positions;
     }
 
     @Override
@@ -52,121 +67,150 @@ class RowReader implements MarshallerReader {
 
     @Override
     public boolean readBoolean() {
-        return row.booleanValue(index++);
+        int idx = nextSchemaIndex();
+        return row.booleanValue(idx);
     }
 
     @Override
     public Boolean readBooleanBoxed() {
-        return row.booleanValueBoxed(index++);
+        int idx = nextSchemaIndex();
+        return row.booleanValueBoxed(idx);
     }
 
     @Override
     public byte readByte() {
-        return row.byteValue(index++);
+        int idx = nextSchemaIndex();
+        return row.byteValue(idx);
     }
 
     @Override
     public Byte readByteBoxed() {
-        return row.byteValueBoxed(index++);
+        int idx = nextSchemaIndex();
+        return row.byteValueBoxed(idx);
     }
 
     @Override
     public short readShort() {
-        return row.shortValue(index++);
+        int idx = nextSchemaIndex();
+        return row.shortValue(idx);
     }
 
     @Override
     public Short readShortBoxed() {
-        return row.shortValueBoxed(index++);
+        int idx = nextSchemaIndex();
+        return row.shortValueBoxed(idx);
     }
 
     @Override
     public int readInt() {
-        return row.intValue(index++);
+        int idx = nextSchemaIndex();
+        return row.intValue(idx);
     }
 
     @Override
     public Integer readIntBoxed() {
-        return row.intValueBoxed(index++);
+        int idx = nextSchemaIndex();
+        return row.intValueBoxed(idx);
     }
 
     @Override
     public long readLong() {
-        return row.longValue(index++);
+        int idx = nextSchemaIndex();
+        return row.longValue(idx);
     }
 
     @Override
     public Long readLongBoxed() {
-        return row.longValueBoxed(index++);
+        int idx = nextSchemaIndex();
+        return row.longValueBoxed(idx);
     }
 
     @Override
     public float readFloat() {
-        return row.floatValue(index++);
+        int idx = nextSchemaIndex();
+        return row.floatValue(idx);
     }
 
     @Override
     public Float readFloatBoxed() {
-        return row.floatValueBoxed(index++);
+        int idx = nextSchemaIndex();
+        return row.floatValueBoxed(idx);
     }
 
     @Override
     public double readDouble() {
-        return row.doubleValue(index++);
+        int idx = nextSchemaIndex();
+        return row.doubleValue(idx);
     }
 
     @Override
     public Double readDoubleBoxed() {
-        return row.doubleValueBoxed(index++);
+        int idx = nextSchemaIndex();
+        return row.doubleValueBoxed(idx);
     }
 
     @Override
     public String readString() {
-        return row.stringValue(index++);
+        int idx = nextSchemaIndex();
+        return row.stringValue(idx);
     }
 
     @Override
     public UUID readUuid() {
-        return row.uuidValue(index++);
+        int idx = nextSchemaIndex();
+        return row.uuidValue(idx);
     }
 
     @Override
     public byte[] readBytes() {
-        return row.bytesValue(index++);
+        int idx = nextSchemaIndex();
+        return row.bytesValue(idx);
     }
 
     @Override
     public BitSet readBitSet() {
-        return row.bitmaskValue(index++);
+        int idx = nextSchemaIndex();
+        return row.bitmaskValue(idx);
     }
 
     @Override
     public BigInteger readBigInt() {
-        return row.numberValue(index++);
+        int idx = nextSchemaIndex();
+        return row.numberValue(idx);
     }
 
     @Override
     public BigDecimal readBigDecimal(int scale) {
-        return row.decimalValue(index++, scale);
+        int idx = nextSchemaIndex();
+        return row.decimalValue(idx);
     }
 
     @Override
     public LocalDate readDate() {
-        return row.dateValue(index++);
+        int idx = nextSchemaIndex();
+        return row.dateValue(idx);
     }
 
     @Override
     public LocalTime readTime() {
-        return row.timeValue(index++);
+        int idx = nextSchemaIndex();
+        return row.timeValue(idx);
     }
 
     @Override
     public Instant readTimestamp() {
-        return row.timestampValue(index++);
+        int idx = nextSchemaIndex();
+        return row.timestampValue(idx);
     }
 
     @Override
     public LocalDateTime readDateTime() {
-        return row.dateTimeValue(index++);
+        int idx = nextSchemaIndex();
+        return row.dateTimeValue(idx);
+    }
+
+    private int nextSchemaIndex() {
+        int i = index++;
+        return positions == null ? i : positions[i];
     }
 }
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowConverterTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowConverterTest.java
new file mode 100644
index 0000000000..3919406ca2
--- /dev/null
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryRowConverterTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Random;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for the {@link BinaryRowConverter} class. */
+public class BinaryRowConverterTest extends BaseIgniteAbstractTest {
+
+    private final Random random = new Random();
+
+    @BeforeEach
+    public void setup() {
+        long seed = System.nanoTime();
+        random.setSeed(seed);
+
+        log.info("Seed: {}", seed);
+    }
+
+    @Test
+    public void testColumnExtractor() {
+        List<Column> columnList = Arrays.asList(
+                new Column("C1".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+                new Column("C2".toUpperCase(Locale.ROOT), NativeTypes.INT64, 
false),
+                new Column("C3".toUpperCase(Locale.ROOT), NativeTypes.STRING, 
false),
+                new Column("C4".toUpperCase(Locale.ROOT), NativeTypes.DATE, 
false)
+        );
+        SchemaDescriptor schema = new SchemaDescriptor(1, columnList, 
List.of("C2", "C4"), null);
+
+        int col1 = random.nextInt();
+        long col2 = random.nextLong();
+        String col3 = String.valueOf(random.nextInt());
+        LocalDate col4 = LocalDate.ofEpochDay(random.nextInt(10000));
+
+        ByteBuffer builder = new BinaryTupleBuilder(4, 128)
+                .appendInt(col1)
+                .appendLong(col2)
+                .appendString(col3)
+                .appendDate(col4)
+                .build();
+
+        BinaryRow binaryRow = new BinaryRowImpl(schema.version(), builder);
+
+        {
+            BinaryRowConverter columnsExtractor = 
BinaryRowConverter.columnsExtractor(schema, 0, 1);
+            BinaryTuple tuple = columnsExtractor.extractColumns(binaryRow);
+            BinaryTupleSchema dstSchema = columnsExtractor.dstSchema();
+
+            assertEquals(col1, dstSchema.value(tuple, 0));
+            assertEquals(col2, dstSchema.value(tuple, 1));
+        }
+
+        {
+            BinaryRowConverter columnsExtractor = 
BinaryRowConverter.columnsExtractor(schema, 1, 2);
+            BinaryTupleSchema dstSchema = columnsExtractor.dstSchema();
+
+            BinaryTuple tuple = columnsExtractor.extractColumns(binaryRow);
+            assertEquals(col2, dstSchema.value(tuple, 0));
+            assertEquals(col3, dstSchema.value(tuple, 1));
+        }
+
+        {
+            BinaryRowConverter columnsExtractor = 
BinaryRowConverter.columnsExtractor(schema, 1, 3);
+            BinaryTupleSchema dstSchema = columnsExtractor.dstSchema();
+
+            BinaryTuple tuple = columnsExtractor.extractColumns(binaryRow);
+            assertEquals(col2, dstSchema.value(tuple, 0));
+            assertEquals(col4, dstSchema.value(tuple, 1));
+        }
+    }
+
+    @Test
+    public void testKeyExtractor() {
+        List<Column> columnList = Arrays.asList(
+                new Column("C1".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+                new Column("C2".toUpperCase(Locale.ROOT), NativeTypes.INT64, 
false),
+                new Column("C3".toUpperCase(Locale.ROOT), NativeTypes.STRING, 
false),
+                new Column("C4".toUpperCase(Locale.ROOT), NativeTypes.DATE, 
false)
+        );
+        SchemaDescriptor schema = new SchemaDescriptor(1, columnList, 
List.of("C2", "C4"), null);
+
+        int col1 = random.nextInt();
+        long col2 = random.nextLong();
+        String col3 = String.valueOf(random.nextInt());
+        LocalDate col4 = LocalDate.ofEpochDay(random.nextInt(10000));
+
+        ByteBuffer builder = new BinaryTupleBuilder(4, 128)
+                .appendInt(col1)
+                .appendLong(col2)
+                .appendString(col3)
+                .appendDate(col4)
+                .build();
+
+        BinaryRow binaryRow = new BinaryRowImpl(schema.version(), builder);
+
+        {
+            BinaryRowConverter columnsExtractor = 
BinaryRowConverter.keyExtractor(schema);
+            BinaryTupleSchema dstSchema = columnsExtractor.dstSchema();
+
+            BinaryTuple tuple = columnsExtractor.extractColumns(binaryRow);
+            assertEquals(col2, dstSchema.value(tuple, 0));
+            assertEquals(col4, dstSchema.value(tuple, 1));
+        }
+    }
+}
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTupleSchemaTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTupleSchemaTest.java
index 376d403b0a..5ad1b22b25 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTupleSchemaTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/BinaryTupleSchemaTest.java
@@ -17,42 +17,110 @@
 
 package org.apache.ignite.internal.schema;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Locale;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.junit.jupiter.api.Test;
 
 /** Tests for the {@link BinaryTupleSchema} class. */
 public class BinaryTupleSchemaTest {
 
-    private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(1, new 
Column[]{
-            new Column("id".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
-    }, new Column[]{
-            new Column("val".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
-    });
-
     @Test
     public void rowSchema() {
-        BinaryTupleSchema schema = BinaryTupleSchema.createRowSchema(SCHEMA);
-        assertEquals(0, schema.columnIndex(0));
-        assertEquals(1, schema.columnIndex(1));
-        assertTrue(schema.convertible());
+        SchemaDescriptor schema = new SchemaDescriptor(1, Arrays.asList(
+                new Column("C1".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+                new Column("C2".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+                new Column("C3".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+                new Column("C4".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false)
+        ), List.of("C2", "C4"), null);
+
+        BinaryTupleSchema rowSchema = 
BinaryTupleSchema.createRowSchema(schema);
+        assertEquals(0, rowSchema.columnIndex(0));
+        assertEquals(1, rowSchema.columnIndex(1));
+        assertTrue(rowSchema.convertible());
     }
 
     @Test
     public void keySchema() {
-        BinaryTupleSchema schema = BinaryTupleSchema.createKeySchema(SCHEMA);
-        assertEquals(0, schema.columnIndex(0));
-        assertTrue(schema.convertible());
+        {
+            SchemaDescriptor schema = new SchemaDescriptor(1, Arrays.asList(
+                    new Column("C1".toUpperCase(Locale.ROOT), 
NativeTypes.INT8, false),
+                    new Column("C2".toUpperCase(Locale.ROOT), 
NativeTypes.INT16, false),
+                    new Column("C3".toUpperCase(Locale.ROOT), 
NativeTypes.INT32, false),
+                    new Column("C4".toUpperCase(Locale.ROOT), 
NativeTypes.INT64, false)
+            ), List.of("C2", "C4"), null);
+
+            BinaryTupleSchema keySchema = 
BinaryTupleSchema.createKeySchema(schema);
+
+            assertEquals(0, keySchema.columnIndex(0));
+            assertEquals(1, keySchema.columnIndex(1));
+            assertTrue(keySchema.convertible());
+
+            expectColumnMatch(schema, keySchema, 1, 0);
+            expectColumnMatch(schema, keySchema, 3, 1);
+        }
+
+        {
+            SchemaDescriptor schema = new SchemaDescriptor(1, Arrays.asList(
+                    new Column("C1".toUpperCase(Locale.ROOT), 
NativeTypes.INT8, false),
+                    new Column("C2".toUpperCase(Locale.ROOT), 
NativeTypes.INT16, false),
+                    new Column("C3".toUpperCase(Locale.ROOT), 
NativeTypes.INT32, false),
+                    new Column("C4".toUpperCase(Locale.ROOT), 
NativeTypes.INT64, false)
+            ), List.of("C4", "C2"), null);
+
+            BinaryTupleSchema keySchema = 
BinaryTupleSchema.createKeySchema(schema);
+
+            assertEquals(0, keySchema.columnIndex(0));
+            assertEquals(1, keySchema.columnIndex(1));
+            assertTrue(keySchema.convertible());
+
+            expectColumnMatch(schema, keySchema, 3, 0);
+            expectColumnMatch(schema, keySchema, 1, 1);
+        }
     }
 
     @Test
     public void valueSchema() {
-        BinaryTupleSchema schema = BinaryTupleSchema.createValueSchema(SCHEMA);
-        assertEquals(1, schema.columnIndex(0));
-        assertFalse(schema.convertible());
+        SchemaDescriptor schema = new SchemaDescriptor(1, Arrays.asList(
+                new Column("C1".toUpperCase(Locale.ROOT), NativeTypes.INT8, 
false),
+                new Column("C2".toUpperCase(Locale.ROOT), NativeTypes.INT16, 
false)
+        ), List.of("C1"), null);
+
+        BinaryTupleSchema valueSchema = 
BinaryTupleSchema.createValueSchema(schema);
+
+        assertEquals(1, valueSchema.columnIndex(0));
+        assertFalse(valueSchema.convertible());
+    }
+
+    @Test
+    public void destinationKeySchema() {
+        SchemaDescriptor schema = new SchemaDescriptor(1, Arrays.asList(
+                new Column("C1".toUpperCase(Locale.ROOT), NativeTypes.INT8, 
false),
+                new Column("C2".toUpperCase(Locale.ROOT), NativeTypes.INT16, 
false),
+                new Column("C3".toUpperCase(Locale.ROOT), NativeTypes.INT32, 
false),
+                new Column("C4".toUpperCase(Locale.ROOT), NativeTypes.INT64, 
false)
+        ), List.of("C2", "C4"), null);
+
+        BinaryTupleSchema dstSchema = 
BinaryTupleSchema.createDestinationKeySchema(schema);
+
+        assertEquals(1, dstSchema.columnIndex(0));
+        assertEquals(3, dstSchema.columnIndex(1));
+        assertFalse(dstSchema.convertible());
+    }
+
+    private static void expectColumnMatch(SchemaDescriptor schema, 
BinaryTupleSchema tupleSchema, int schemaIdx, int tupleIdx) {
+        Column column = schema.column(schemaIdx);
+        Element elem = tupleSchema.element(tupleIdx);
+
+        String message = format("schema field: {}, tuple field: {}", 
schemaIdx, tupleIdx);
+        assertSame(column.type().spec(), elem.typeSpec, message);
     }
 }
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
index 3a8a2d801c..ed12a78016 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
@@ -55,11 +55,16 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
@@ -67,12 +72,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.processing.Generated;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.marshaller.MarshallerException;
 import org.apache.ignite.internal.marshaller.SerializingConverter;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithNoDefaultConstructor;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithPrivateConstructor;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaTestUtils;
@@ -535,6 +542,170 @@ public class KvMarshallerTest {
         return baos.toByteArray();
     }
 
+    @ParameterizedTest
+    @MethodSource("marshallerFactoryProvider")
+    public void testKeyColumnPlacement(MarshallerFactory marshallerFactory) 
throws MarshallerException {
+        Assumptions.assumeFalse(marshallerFactory instanceof 
AsmMarshallerGenerator);
+
+        Mapper<TestObjectKeyPart> keyMapper = 
Mapper.of(TestObjectKeyPart.class);
+        Mapper<TestObjectValPart> valueMapper = 
Mapper.of(TestObjectValPart.class);
+
+        // Types match TestObjectKeyPart, TestObjectValuePart
+        List<Column> columns = new ArrayList<>(List.of(
+                new Column("COL1", INT64, false),
+                new Column("COL2", INT32, false),
+                new Column("COL3", STRING, false),
+                new Column("COL4", BOOLEAN, false),
+                new Column("COL5", DATE, false)
+        ));
+
+        Collections.shuffle(columns, rnd);
+
+        List<String> keyColumns = columns.stream()
+                .map(Column::name)
+                .filter(c -> "COL2".equals(c) || "COL4".equals(c))
+                .collect(Collectors.toList());
+
+        SchemaDescriptor descriptor = new SchemaDescriptor(1, columns, 
keyColumns, null);
+
+        KvMarshaller<TestObjectKeyPart, TestObjectValPart> kvMarshaller = 
marshallerFactory.create(
+                descriptor,
+                keyMapper,
+                valueMapper
+        );
+
+        TestObjectKeyPart key = new TestObjectKeyPart();
+        key.col2 = rnd.nextInt();
+        key.col4 = rnd.nextBoolean();
+
+        TestObjectValPart val = new TestObjectValPart();
+        val.col1 = rnd.nextLong();
+        val.col3 = String.valueOf(rnd.nextInt());
+        val.col5 = LocalDate.ofEpochDay(rnd.nextInt(10_000));
+
+        Map<String, Object> columnNameToValue = new HashMap<>();
+        columnNameToValue.put("COL1", val.col1);
+        columnNameToValue.put("COL2", key.col2);
+        columnNameToValue.put("COL3", val.col3);
+        columnNameToValue.put("COL4", key.col4);
+        columnNameToValue.put("COL5", val.col5);
+
+        Map<String, Integer> columnNameToIdx = new HashMap<>();
+        for (int i = 0; i < columns.size(); i++) {
+            columnNameToIdx.put(columns.get(i).name(), i);
+        }
+
+        Map<Integer, Object> columnIdxToValue = new HashMap<>();
+        Map<Integer, Object> keyColumnIdxToValue = new HashMap<>();
+
+        for (int i = 0, j = 0; i < columns.size(); i++) {
+            String name = columns.get(i).name();
+            int idx = columnNameToIdx.get(name);
+            Object colValue = columnNameToValue.get(name);
+
+            columnIdxToValue.put(idx, colValue);
+
+            if (keyColumns.contains(name)) {
+                keyColumnIdxToValue.put(j, colValue);
+                j++;
+            }
+        }
+
+        // Check key only row
+
+        Row keyRow = kvMarshaller.marshal(key);
+        assertEquals(2, keyRow.elementCount());
+        assertEquals(keyColumnIdxToValue.get(0), keyRow.value(0));
+        assertEquals(keyColumnIdxToValue.get(1), keyRow.value(1));
+
+        // Check full row
+
+        Row fullRow = kvMarshaller.marshal(key, val);
+        assertEquals(fullRow.elementCount(), descriptor.length());
+        assertEquals(columnIdxToValue.get(0), fullRow.value(0));
+        assertEquals(columnIdxToValue.get(1), fullRow.value(1));
+        assertEquals(columnIdxToValue.get(2), fullRow.value(2));
+        assertEquals(columnIdxToValue.get(3), fullRow.value(3));
+        assertEquals(columnIdxToValue.get(4), fullRow.value(4));
+    }
+
+    static class TestObjectKeyPart {
+        int col2;
+        boolean col4;
+    }
+
+    static class TestObjectValPart {
+        long col1;
+        String col3;
+        LocalDate col5;
+    }
+
+    @ParameterizedTest
+    @MethodSource("marshallerFactoryProvider")
+    public void unmarshallKey(MarshallerFactory marshallerFactory) throws 
MarshallerException {
+        Assumptions.assumeFalse(marshallerFactory instanceof 
AsmMarshallerGenerator);
+
+        Mapper<TestObjectKeyPart> keyMapper = 
Mapper.of(TestObjectKeyPart.class);
+        Mapper<TestObjectValPart> valueMapper = 
Mapper.of(TestObjectValPart.class);
+
+        // Types match TestObjectKeyPart, TestObjectValuePart
+        List<Column> columns = new ArrayList<>(List.of(
+                new Column("COL1", INT64, false),
+                new Column("COL2", INT32, false),
+                new Column("COL3", STRING, false),
+                new Column("COL4", BOOLEAN, false),
+                new Column("COL5", DATE, false)
+        ));
+
+        SchemaDescriptor descriptor = new SchemaDescriptor(1, columns, 
List.of("COL2", "COL4"), null);
+
+        KvMarshaller<TestObjectKeyPart, TestObjectValPart> marshaller = 
marshallerFactory.create(
+                descriptor,
+                keyMapper,
+                valueMapper
+        );
+
+        TestObjectKeyPart key = new TestObjectKeyPart();
+        key.col2 = rnd.nextInt();
+        key.col4 = rnd.nextBoolean();
+
+        TestObjectValPart val = new TestObjectValPart();
+        val.col1 = rnd.nextLong();
+        val.col3 = String.valueOf(rnd.nextInt());
+        val.col5 = LocalDate.ofEpochDay(rnd.nextInt(10_000));
+
+        // Key only row
+        {
+            ByteBuffer tupleBuf = new 
BinaryTupleBuilder(descriptor.keyColumns().size(), 128)
+                    .appendInt(key.col2)
+                    .appendBoolean(key.col4)
+                    .build();
+
+            BinaryRow row = new BinaryRowImpl(descriptor.version(), tupleBuf);
+
+            TestObjectKeyPart keyPart = 
marshaller.unmarshalKeyOnly(Row.wrapKeyOnlyBinaryRow(descriptor, row));
+            assertEquals(key.col2, keyPart.col2);
+            assertEquals(key.col4, keyPart.col4);
+        }
+
+        // full row
+        {
+            ByteBuffer tupleBuf = new BinaryTupleBuilder(descriptor.length(), 
128)
+                    .appendLong(val.col1)
+                    .appendLong(key.col2)
+                    .appendString(val.col3)
+                    .appendBoolean(key.col4)
+                    .appendDate(val.col5)
+                    .build();
+
+            BinaryRow row = new BinaryRowImpl(descriptor.version(), tupleBuf);
+
+            TestObjectKeyPart keyPart = 
marshaller.unmarshalKey(Row.wrapBinaryRow(descriptor, row));
+            assertEquals(key.col2, keyPart.col2);
+            assertEquals(key.col4, keyPart.col4);
+        }
+    }
+
     /**
      * Generate random key-value pair of given types and check serialization 
and deserialization works fine.
      *
@@ -556,8 +727,8 @@ public class KvMarshallerTest {
         KvMarshaller<Object, Object> marshaller = factory.create(schema,
                 Mapper.of((Class<Object>) key.getClass(), "\"key\""),
                 Mapper.of((Class<Object>) val.getClass(), "\"val\""));
-        Row row = Row.wrapBinaryRow(schema, marshaller.marshal(key, val));
 
+        Row row = Row.wrapBinaryRow(schema, marshaller.marshal(key, val));
         Object key1 = marshaller.unmarshalKey(row);
         Object val1 = marshaller.unmarshalValue(row);
 
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItKvKeyColumnPositionTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItKvKeyColumnPositionTest.java
new file mode 100644
index 0000000000..63094924e5
--- /dev/null
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItKvKeyColumnPositionTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.NullableValue;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Test for KV view with different key column placement.
+ */
+public class ItKvKeyColumnPositionTest extends BaseSqlIntegrationTest {
+
+    private static final AtomicInteger ID_NUM = new AtomicInteger();
+
+    // server
+
+    private KeyValueView<IntString, BoolInt> serverValKey;
+
+    private KeyValueView<IntString, BoolInt> serverKeyVal;
+
+    private KeyValueView<IntString, BoolInt> serverValKeyFlipped;
+
+    private KeyValueView<IntString, BoolInt> serverKeyValFlipped;
+
+    private KeyValueView<String, IntBoolDate> serverSimpleKeyVal;
+
+    private KeyValueView<String, IntBoolDate> serverSimpleValKey;
+
+    // client
+    // TODO https://issues.apache.org/jira/browse/IGNITE-21768
+    @SuppressWarnings("unused")
+    private KeyValueView<IntString, BoolInt> clientValKey;
+
+    @SuppressWarnings("unused")
+    private KeyValueView<IntString, BoolInt> clientKeyVal;
+
+    @SuppressWarnings("unused")
+    private KeyValueView<IntString, BoolInt> clientValKeyFlipped;
+
+    @SuppressWarnings("unused")
+    private KeyValueView<IntString, BoolInt> clientKeyValFlipped;
+
+    private KeyValueView<String, IntBoolDate> clientSimpleKeyVal;
+
+    @SuppressWarnings("unused")
+    private KeyValueView<String, IntBoolDate> clientSimpleValKey;
+
+    private IgniteClient client;
+
+    @BeforeAll
+    public void setup() {
+        sql("CREATE TABLE key_val (intCol INT, boolCol BOOLEAN, dateCol DATE, 
strCol VARCHAR, PRIMARY KEY (intCol, strCol))");
+        sql("CREATE TABLE key_val_flip (intCol INT, boolCol BOOLEAN, dateCol 
DATE, strCol VARCHAR, PRIMARY KEY (strCol, intCol))");
+
+        sql("CREATE TABLE val_key (boolCol BOOLEAN, intCol INT, dateCol DATE, 
strCol VARCHAR, PRIMARY KEY (intCol, strCol))");
+        sql("CREATE TABLE val_key_flip (boolCol BOOLEAN, intCol INT, dateCol 
DATE, strCol VARCHAR, PRIMARY KEY (strCol, intCol))");
+
+        sql("CREATE TABLE simple_key_val (strCol VARCHAR, boolCol BOOLEAN, 
dateCol DATE, intCol INT, PRIMARY KEY (strCol))");
+        sql("CREATE TABLE simple_val_key (boolCol BOOLEAN, intCol INT, dateCol 
DATE, strCol VARCHAR, PRIMARY KEY (strCol))");
+
+        // SERVER
+        IgniteImpl igniteImpl = CLUSTER.aliveNode();
+
+        IgniteTables serverTables = igniteImpl.tables();
+        serverKeyVal = 
serverTables.table("key_val").keyValueView(IntString.class, BoolInt.class);
+        serverKeyValFlipped = 
serverTables.table("key_val_flip").keyValueView(IntString.class, BoolInt.class);
+        serverValKey = 
serverTables.table("val_key").keyValueView(IntString.class, BoolInt.class);
+        serverValKeyFlipped = 
serverTables.table("val_key_flip").keyValueView(IntString.class, BoolInt.class);
+
+        serverSimpleKeyVal = 
serverTables.table("simple_key_val").keyValueView(Mapper.of(String.class), 
Mapper.of(IntBoolDate.class));
+        serverSimpleValKey = 
serverTables.table("simple_val_key").keyValueView(Mapper.of(String.class), 
Mapper.of(IntBoolDate.class));
+
+        // CLIENT
+
+        String addressString = "127.0.0.1:" + 
igniteImpl.clientAddress().port();
+
+        client = IgniteClient.builder().addresses(addressString).build();
+
+        IgniteTables clientTables = client.tables();
+        clientKeyValFlipped = 
clientTables.table("key_val_flip").keyValueView(IntString.class, BoolInt.class);
+        clientKeyVal = 
clientTables.table("key_val").keyValueView(IntString.class, BoolInt.class);
+        clientValKey = 
clientTables.table("val_key").keyValueView(IntString.class, BoolInt.class);
+        clientValKeyFlipped = 
clientTables.table("val_key_flip").keyValueView(IntString.class, BoolInt.class);
+
+        clientSimpleKeyVal = 
clientTables.table("simple_key_val").keyValueView(Mapper.of(String.class), 
Mapper.of(IntBoolDate.class));
+        clientSimpleValKey = 
clientTables.table("simple_val_key").keyValueView(Mapper.of(String.class), 
Mapper.of(IntBoolDate.class));
+    }
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @AfterAll
+    public void closeClient() throws Exception {
+        client.close();
+    }
+
+    private List<Arguments> complexKeyKvs() {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21768
+        // Arguments.of(Named.named("client key_val_key", clientKeyVal)),
+        // Arguments.of(Named.named("client key_val_key_flipped", 
clientKeyValFlipped)),
+        // Arguments.of(Named.named("client val_key_val_key", clientValKey)),
+        // Arguments.of(Named.named("client val_key_val_key", 
clientValKeyFlipped))
+
+        return List.of(
+                Arguments.of(Named.named("server key_val_key", serverKeyVal)),
+                Arguments.of(Named.named("server key_val_key_flipped", 
serverKeyValFlipped)),
+                Arguments.of(Named.named("server val_key_val_key", 
serverValKey)),
+                Arguments.of(Named.named("server val_key_val_key_flipped", 
serverValKeyFlipped))
+        );
+    }
+
+    private List<Arguments> simpleKeyKvs() {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21768
+        // Arguments.of(Named.named("client simple val key", 
clientSimpleValKey))
+
+        return List.of(
+                Arguments.of(Named.named("server simple key val", 
serverSimpleKeyVal)),
+                Arguments.of(Named.named("server simple val key", 
serverSimpleValKey)),
+                Arguments.of(Named.named("client simple key val", 
clientSimpleKeyVal))
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("complexKeyKvs")
+    public void testPutGet(KeyValueView<IntString, BoolInt> kvView) {
+        {
+            IntString key = newKey();
+
+            BoolInt val = new BoolInt();
+            val.boolCol = true;
+            val.dateCol = LocalDate.now();
+
+            kvView.put(null, key, val);
+
+            BoolInt retrieved = kvView.get(null, key);
+            assertEquals(val, retrieved);
+        }
+
+        {
+            IntString key = newKey();
+
+            BoolInt val = new BoolInt();
+            val.boolCol = true;
+
+            kvView.put(null, key, val);
+
+            BoolInt retrieved = kvView.get(null, key);
+            assertEquals(val, retrieved);
+        }
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21836";)
+    @ParameterizedTest
+    @MethodSource("nullableKvsSimple")
+    public void testNotNullableGetSimple(KeyValueView<String, IntBoolDate> 
kvView) {
+        String key = String.valueOf(ID_NUM.incrementAndGet());
+        kvView.put(null, key, null);
+
+        NullableValue<IntBoolDate> nullable = kvView.getNullable(null, key);
+        assertNull(nullable.get());
+    }
+
+    private List<Arguments> nullableKvsSimple() {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21768
+        // Arguments.of(Named.named("client", clientSimpleKeyVal))
+
+        return List.of(
+                Arguments.of(Named.named("server", serverSimpleValKey))
+        );
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21836";)
+    @ParameterizedTest
+    @MethodSource("complexKeyKvs")
+    public void testNotNullableGetComplex(KeyValueView<IntString, BoolInt> 
kvView) {
+        IntString key = newKey();
+        kvView.put(null, key, null);
+
+        NullableValue<BoolInt> nullable = kvView.getNullable(null, key);
+        assertNull(nullable.get());
+    }
+
+    @ParameterizedTest
+    @MethodSource("simpleKeyKvs")
+    public void testSimplePutGet(KeyValueView<String, IntBoolDate> kvView) {
+        IntBoolDate val = new IntBoolDate();
+        val.boolCol = true;
+        val.intCol = (int) System.nanoTime();
+        val.dateCol = 
LocalDate.ofEpochDay(ThreadLocalRandom.current().nextLong(1000));
+
+        kvView.put(null, "1", val);
+
+        IntBoolDate retrieved = kvView.get(null, "1");
+        assertEquals(val, retrieved);
+    }
+
+    /**
+     * Checks remove all since this API calls {@link 
KvMarshaller#unmarshalKeyOnly(Row)}.
+     */
+    @ParameterizedTest
+    @MethodSource("complexKeyKvs")
+    public void testRemoveAll(KeyValueView<IntString, BoolInt> kvView) {
+        IntString k1 = newKey();
+        IntString k2 = newKey();
+
+        kvView.put(null, k1, new BoolInt());
+        kvView.put(null, k2, new BoolInt());
+
+        kvView.removeAll(null, List.of(k1, k2));
+
+        assertNull(kvView.get(null, k1));
+        assertNull(kvView.get(null, k2));
+    }
+
+    /**
+     * Checks remove all since this API calls {@link 
KvMarshaller#marshal(Object, Object)}}.
+     */
+    @ParameterizedTest
+    @MethodSource("complexKeyKvs")
+    public void testPutAll(KeyValueView<IntString, BoolInt> kvView) {
+        IntString key1 = newKey();
+        BoolInt val1 = new BoolInt();
+        val1.dateCol = 
LocalDate.now().plusDays(ThreadLocalRandom.current().nextInt(100));
+
+        IntString key2 = newKey();
+        BoolInt val2 = new BoolInt();
+        val2.dateCol = 
LocalDate.now().plusDays(ThreadLocalRandom.current().nextInt(100));
+
+        kvView.putAll(null, Map.of(key1, val1, key2, val2));
+
+        BoolInt retrieved1 = kvView.get(null, key1);
+        assertEquals(val1, retrieved1);
+
+        BoolInt retrieved2 = kvView.get(null, key2);
+        assertEquals(val2, retrieved2);
+    }
+
+    @ParameterizedTest
+    @MethodSource("complexKeyKvs")
+    public void testGetAll(KeyValueView<IntString, BoolInt> kvView) {
+        IntString key1 = newKey();
+        BoolInt val1 = new BoolInt();
+        val1.dateCol = 
LocalDate.now().plusDays(ThreadLocalRandom.current().nextInt(100));
+        kvView.put(null, key1, val1);
+
+        IntString key2 = newKey();
+        BoolInt val2 = new BoolInt();
+        val2.dateCol = 
LocalDate.now().plusDays(ThreadLocalRandom.current().nextInt(100));
+
+        kvView.put(null, key2, val2);
+
+        Map<IntString, BoolInt> all = kvView.getAll(null, List.of(key1, key2));
+        assertEquals(Map.of(key1, val1, key2, val2), all);
+    }
+
+    private static IntString newKey() {
+        int val = ID_NUM.incrementAndGet();
+
+        IntString key = new IntString();
+        key.intCol = val;
+        key.strCol = Integer.toString(val);
+        return key;
+    }
+
+    static class IntString {
+        @IgniteToStringInclude
+        int intCol;
+        @IgniteToStringInclude
+        String strCol;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            IntString intString = (IntString) o;
+            return intCol == intString.intCol && Objects.equals(strCol, 
intString.strCol);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(intCol, strCol);
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(this);
+        }
+    }
+
+    static class BoolInt {
+        @IgniteToStringInclude
+        boolean boolCol;
+        @IgniteToStringInclude
+        LocalDate dateCol;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            BoolInt boolInt = (BoolInt) o;
+            return boolCol == boolInt.boolCol && Objects.equals(dateCol, 
boolInt.dateCol);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(boolCol, dateCol);
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(this);
+        }
+    }
+
+    static class IntBoolDate {
+        @IgniteToStringInclude
+        int intCol;
+        @IgniteToStringInclude
+        boolean boolCol;
+        @IgniteToStringInclude
+        LocalDate dateCol;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            IntBoolDate that = (IntBoolDate) o;
+            return intCol == that.intCol && boolCol == that.boolCol && 
Objects.equals(dateCol, that.dateCol);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(intCol, boolCol, dateCol);
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(this);
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 8688f060f5..2ca8f98a18 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -602,7 +602,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
         try {
             for (Row row : rowConverter.resolveKeys(rows, schemaVersion)) {
                 if (row != null) {
-                    keys.add(marsh.unmarshalKey(row));
+                    keys.add(marsh.unmarshalKeyOnly(row));
                 }
             }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 3e6f8d6b66..bae54ec3af 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -178,9 +178,9 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
         IndexLocker hashIndexLocker = new HashIndexLocker(HASH_INDEX_ID, 
false, LOCK_MANAGER, row2HashKeyConverter);
 
         BinaryTupleSchema rowSchema = 
BinaryTupleSchema.createRowSchema(schemaDescriptor);
-        BinaryTupleSchema valueSchema = 
BinaryTupleSchema.createValueSchema(schemaDescriptor);
+        BinaryTupleSchema keySchema = 
BinaryTupleSchema.createKeySchema(schemaDescriptor);
 
-        row2SortKeyConverter = new BinaryRowConverter(rowSchema, valueSchema);
+        row2SortKeyConverter = new BinaryRowConverter(rowSchema, keySchema);
 
         TableSchemaAwareIndexStorage sortedIndexStorage = new 
TableSchemaAwareIndexStorage(
                 SORTED_INDEX_ID,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 3114868f88..cd3257ff14 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -2708,7 +2708,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private TestKey key(BinaryRow binaryRow) {
         try {
-            return 
kvMarshaller.unmarshalKey(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, 
binaryRow));
+            return 
kvMarshaller.unmarshalKeyOnly(Row.wrapKeyOnlyBinaryRow(schemaDescriptor, 
binaryRow));
         } catch (MarshallerException e) {
             throw new AssertionError(e);
         }

Reply via email to