This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 593161d287 IGNITE-16094 Add caching for Tuple marshallers (#3146)
593161d287 is described below

commit 593161d2871cbe6ab2550c2a51e35a6326f875d4
Author: Max Zhuravkov <[email protected]>
AuthorDate: Thu Feb 8 16:28:20 2024 +0200

    IGNITE-16094 Add caching for Tuple marshallers (#3146)
---
 .../org/apache/ignite/table/mapper/Mapper.java     |   1 -
 .../apache/ignite/table/mapper/MapperBuilder.java  |   1 +
 .../ignite/internal/client/TcpIgniteClient.java    |   8 +-
 .../internal/client/sql/ClientAsyncResultSet.java  |  12 +-
 .../ignite/internal/client/sql/ClientSession.java  |   8 +-
 .../internal/client/sql/ClientSessionBuilder.java  |  18 +-
 .../ignite/internal/client/sql/ClientSql.java      |  12 +-
 .../internal/client/table/AbstractClientView.java  |   2 +-
 .../ignite/internal/client/table/ClientSchema.java |  86 ++++++++-
 .../ignite/internal/client/table/ClientTable.java  |  19 +-
 .../ignite/internal/client/table/ClientTables.java |  11 +-
 .../org/apache/ignite/client/ClientTupleTest.java  |   7 +-
 .../ignite/internal/index/IndexManagerTest.java    |   4 +-
 modules/marshaller-common/build.gradle             |   1 +
 .../ignite/internal/marshaller/Marshaller.java     |   2 +-
 .../internal/marshaller/MarshallerColumn.java      |  22 +++
 .../internal/marshaller/MarshallerSchema.java      |  33 ++++
 .../internal/marshaller/MarshallersProvider.java   |  87 +++++++++
 .../marshaller/ReflectionMarshallersProvider.java  | 202 ++++++++++++++++++++
 .../ReflectionMarshallersProviderSelfTest.java     | 209 +++++++++++++++++++++
 .../internal/table/ItThinClientColocationTest.java |   5 +-
 .../ignite/internal/schema/SchemaDescriptor.java   |  59 ++++++
 .../marshaller/reflection/KvMarshallerImpl.java    |  12 +-
 .../reflection/RecordMarshallerImpl.java           |  19 +-
 .../reflection/ReflectionMarshallerFactory.java    |   8 +-
 .../schema/marshaller/KvMarshallerTest.java        |  30 +--
 .../schema/marshaller/RecordMarshallerTest.java    |  24 ++-
 .../ignite/internal/table/AbstractTableView.java   |   9 +-
 .../internal/table/KeyValueBinaryViewImpl.java     |  12 +-
 .../ignite/internal/table/KeyValueViewImpl.java    |  18 +-
 .../internal/table/RecordBinaryViewImpl.java       |  12 +-
 .../ignite/internal/table/RecordViewImpl.java      |  16 +-
 .../apache/ignite/internal/table/TableImpl.java    |  26 ++-
 .../internal/table/distributed/TableManager.java   |   6 +-
 .../table/KeyValueBinaryViewOperationsTest.java    |   7 +-
 .../internal/table/KeyValueViewOperationsTest.java |   7 +-
 .../table/RecordBinaryViewOperationsTest.java      |   4 +-
 .../internal/table/RecordViewOperationsTest.java   |   7 +-
 38 files changed, 922 insertions(+), 104 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java 
b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
index b2a700a1b9..11e8014242 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/mapper/Mapper.java
@@ -53,7 +53,6 @@ public interface Mapper<T> {
      */
     static <O> Mapper<O> of(Class<O> type) {
         if (nativelySupported(type)) {
-            // TODO: Cache mappers (IGNITE-16094).
             return new OneColumnMapperImpl<>(type, null, null);
         } else {
             return builder(type).automap().build();
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java 
b/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java
index 410e99300d..96b78154a7 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/mapper/MapperBuilder.java
@@ -41,6 +41,7 @@ import org.apache.ignite.lang.util.IgniteNameUtils;
  * @param <T> Type of objects the mapper handles.
  */
 public final class MapperBuilder<T> {
+
     /** Target type. */
     private final Class<T> targetType;
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 4ba861d2e3..76156275b6 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.client.table.ClientTables;
 import org.apache.ignite.internal.client.tx.ClientTransactions;
 import org.apache.ignite.internal.jdbc.proto.ClientMessage;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.metrics.exporters.jmx.JmxExporter;
 import org.apache.ignite.lang.ErrorGroups;
@@ -71,6 +72,9 @@ public class TcpIgniteClient implements IgniteClient {
     /** Metrics. */
     private final ClientMetricSource metrics;
 
+    /** Marshallers provider. */
+    private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
     /**
      * Constructor.
      *
@@ -94,10 +98,10 @@ public class TcpIgniteClient implements IgniteClient {
 
         metrics = new ClientMetricSource();
         ch = new ReliableChannel(chFactory, cfg, metrics);
-        tables = new ClientTables(ch);
+        tables = new ClientTables(ch, marshallers);
         transactions = new ClientTransactions(ch);
         compute = new ClientCompute(ch, tables);
-        sql = new ClientSql(ch);
+        sql = new ClientSql(ch, marshallers);
         metricManager = initMetricManager(cfg);
     }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
index 51cf656195..18265be920 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
@@ -27,12 +27,12 @@ import 
org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
-import org.apache.ignite.internal.client.proto.TuplePart;
 import org.apache.ignite.internal.client.table.ClientColumn;
 import org.apache.ignite.internal.client.table.ClientSchema;
 import org.apache.ignite.internal.marshaller.ClientMarshallerReader;
 import org.apache.ignite.internal.marshaller.Marshaller;
 import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.lang.CursorClosedException;
 import org.apache.ignite.lang.ErrorGroups.Client;
 import org.apache.ignite.lang.IgniteException;
@@ -90,7 +90,7 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
      * @param in Unpacker.
      * @param mapper Mapper.
      */
-    ClientAsyncResultSet(ClientChannel ch, ClientMessageUnpacker in, @Nullable 
Mapper<T> mapper) {
+    ClientAsyncResultSet(ClientChannel ch, MarshallersProvider marshallers, 
ClientMessageUnpacker in, @Nullable Mapper<T> mapper) {
         this.ch = ch;
 
         resourceId = in.tryUnpackNil() ? null : in.unpackLong();
@@ -102,7 +102,7 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
 
         this.mapper = mapper;
         marshaller = metadata != null && mapper != null && mapper.targetType() 
!= SqlRow.class
-                ? marshaller(metadata, mapper)
+                ? marshaller(metadata, marshallers, mapper)
                 : null;
 
         if (hasRowSet) {
@@ -300,7 +300,7 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
         }
     }
 
-    private static <T> Marshaller marshaller(ResultSetMetadata metadata, 
Mapper<T> mapper) {
+    private static <T> Marshaller marshaller(ResultSetMetadata metadata, 
MarshallersProvider marshallers, Mapper<T> mapper) {
         var schemaColumns = new ClientColumn[metadata.columns().size()];
         List<ColumnMetadata> columns = metadata.columns();
 
@@ -320,7 +320,7 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
             schemaColumns[i] = schemaColumn;
         }
 
-        var schema = new ClientSchema(0, schemaColumns, null);
-        return schema.getMarshaller(mapper, TuplePart.KEY_AND_VAL);
+        var schema = new ClientSchema(0, schemaColumns, null, marshallers);
+        return schema.getMarshaller(mapper);
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 5d6e739683..ce21e337e9 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.sql.AbstractSession;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.SqlException;
@@ -55,6 +56,8 @@ public class ClientSession implements AbstractSession {
 
     private final ReliableChannel ch;
 
+    private final MarshallersProvider marshallers;
+
     @Nullable
     private final Integer defaultPageSize;
 
@@ -74,6 +77,7 @@ public class ClientSession implements AbstractSession {
      * Constructor.
      *
      * @param ch Channel.
+     * @param marshallers Marshallers provider.
      * @param defaultPageSize Default page size.
      * @param defaultSchema Default schema.
      * @param defaultQueryTimeout Default query timeout.
@@ -83,12 +87,14 @@ public class ClientSession implements AbstractSession {
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     ClientSession(
             ReliableChannel ch,
+            MarshallersProvider marshallers,
             @Nullable Integer defaultPageSize,
             @Nullable String defaultSchema,
             @Nullable Long defaultQueryTimeout,
             @Nullable Long defaultSessionTimeout,
             @Nullable Map<String, Object> properties) {
         this.ch = ch;
+        this.marshallers = marshallers;
         this.defaultPageSize = defaultPageSize;
         this.defaultSchema = defaultSchema;
         this.defaultQueryTimeout = defaultQueryTimeout;
@@ -165,7 +171,7 @@ public class ClientSession implements AbstractSession {
             w.out().packLong(ch.observableTimestamp());
         };
 
-        PayloadReader<AsyncResultSet<T>> payloadReader = r -> new 
ClientAsyncResultSet<>(r.clientChannel(), r.in(), mapper);
+        PayloadReader<AsyncResultSet<T>> payloadReader = r -> new 
ClientAsyncResultSet<>(r.clientChannel(), marshallers, r.in(), mapper);
 
         if (transaction != null) {
             try {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java
index 1fdfdcb481..ad8e3c3d63 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSessionBuilder.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.Session.SessionBuilder;
 import org.jetbrains.annotations.Nullable;
@@ -33,6 +34,9 @@ public class ClientSessionBuilder implements SessionBuilder {
     /** Channel. */
     private final ReliableChannel ch;
 
+    /** Marshallers provider. */
+    private final MarshallersProvider marshallers;
+
     /** Properties. */
     private final Map<String, Object> properties = new HashMap<>();
 
@@ -52,9 +56,11 @@ public class ClientSessionBuilder implements SessionBuilder {
      * Constructor.
      *
      * @param ch Channel.
+     * @param marshallers Marshallers provider.
      */
-    public ClientSessionBuilder(ReliableChannel ch) {
+    public ClientSessionBuilder(ReliableChannel ch, MarshallersProvider 
marshallers) {
         this.ch = ch;
+        this.marshallers = marshallers;
     }
 
     @Override
@@ -127,6 +133,14 @@ public class ClientSessionBuilder implements 
SessionBuilder {
 
     @Override
     public Session build() {
-        return new ClientSession(ch, pageSize, defaultSchema, 
defaultQueryTimeoutMs, defaultSessionTimeoutMs, new HashMap<>(properties));
+        return new ClientSession(
+                ch,
+                marshallers,
+                pageSize,
+                defaultSchema,
+                defaultQueryTimeoutMs,
+                defaultSessionTimeoutMs,
+                new HashMap<>(properties)
+        );
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 1d796b3965..4bb3881274 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client.sql;
 
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.Session.SessionBuilder;
@@ -31,25 +32,30 @@ public class ClientSql implements IgniteSql {
     /** Channel. */
     private final ReliableChannel ch;
 
+    /** Marshallers provider. */
+    private final MarshallersProvider marshallers;
+
     /**
      * Constructor.
      *
      * @param ch Channel.
+     * @param marshallers Marshallers provider.
      */
-    public ClientSql(ReliableChannel ch) {
+    public ClientSql(ReliableChannel ch, MarshallersProvider marshallers) {
         this.ch = ch;
+        this.marshallers = marshallers;
     }
 
     /** {@inheritDoc} */
     @Override
     public Session createSession() {
-        return new ClientSession(ch, null, null, null, null, null);
+        return new ClientSession(ch, marshallers, null, null, null, null, 
null);
     }
 
     /** {@inheritDoc} */
     @Override
     public SessionBuilder sessionBuilder() {
-        return new ClientSessionBuilder(ch);
+        return new ClientSessionBuilder(ch, marshallers);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
index 9e0dfa5fca..5b2293624e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
@@ -134,7 +134,7 @@ abstract class AbstractClientView<T> implements 
CriteriaQuerySource<T> {
                     SqlSerializer ser = createSqlSerializer(tbl.name(), 
schema.columns(), criteria);
 
                     Statement statement = new 
ClientStatementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
-                    Session session = new 
ClientSessionBuilder(tbl.channel()).build();
+                    Session session = new ClientSessionBuilder(tbl.channel(), 
tbl.marshallers()).build();
 
                     return session.executeAsync(tx, statement, 
ser.getArguments())
                             .<AsyncCursor<T>>thenApply(resultSet -> {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
index 336e8710f1..326e820c38 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.client.proto.TuplePart;
 import org.apache.ignite.internal.marshaller.BinaryMode;
 import org.apache.ignite.internal.marshaller.Marshaller;
 import org.apache.ignite.internal.marshaller.MarshallerColumn;
+import org.apache.ignite.internal.marshaller.MarshallerSchema;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.lang.ColumnNotFoundException;
 import org.apache.ignite.lang.ErrorGroups.Client;
 import org.apache.ignite.lang.IgniteException;
@@ -50,19 +52,27 @@ public class ClientSchema {
     /** Columns map by name. */
     private final Map<String, ClientColumn> map = new HashMap<>();
 
+    /** Marshaller provider. */
+    private final MarshallersProvider marshallers;
+
+    /** Marshaller schema. */
+    private MarshallerSchema marshallerSchema;
+
     /**
      * Constructor.
      *
      * @param ver Schema version.
      * @param columns Columns.
      * @param colocationColumns Colocation columns. When null, all key columns 
are used.
+     * @param marshallers Marshallers provider.
      */
-    public ClientSchema(int ver, ClientColumn[] columns, ClientColumn 
@Nullable [] colocationColumns) {
+    public ClientSchema(int ver, ClientColumn[] columns, ClientColumn 
@Nullable [] colocationColumns, MarshallersProvider marshallers) {
         assert ver >= 0;
         assert columns != null;
 
         this.ver = ver;
         this.columns = columns;
+        this.marshallers = marshallers;
         var keyCnt = 0;
 
         for (var col : columns) {
@@ -148,11 +158,30 @@ public class ClientSchema {
     }
 
     public <T> Marshaller getMarshaller(Mapper mapper, TuplePart part) {
-        // TODO: Cache Marshallers (IGNITE-16094).
         return getMarshaller(mapper, part, part == TuplePart.KEY);
     }
 
+    /** Returns a marshaller for columns defined by this client schema. */
+    public <T> Marshaller getMarshaller(Mapper mapper) {
+        MarshallerColumn[] marshallerColumns = 
toMarshallerColumns(TuplePart.KEY_AND_VAL);
+
+        return marshallers.getMarshaller(marshallerColumns, mapper, true, 
false);
+    }
+
     <T> Marshaller getMarshaller(Mapper mapper, TuplePart part, boolean 
allowUnmappedFields) {
+        switch (part) {
+            case KEY:
+                return marshallers.getKeysMarshaller(marshallerSchema(), 
mapper, true, allowUnmappedFields);
+            case VAL:
+                return marshallers.getValuesMarshaller(marshallerSchema(), 
mapper, true, allowUnmappedFields);
+            case KEY_AND_VAL:
+                return marshallers.getRowMarshaller(marshallerSchema(), 
mapper, true, allowUnmappedFields);
+            default:
+                throw new AssertionError("Unexpected tuple part: " + part);
+        }
+    }
+
+    private MarshallerColumn[] toMarshallerColumns(TuplePart part) {
         int colCount = columns.length;
         int firstColIdx = 0;
 
@@ -171,7 +200,7 @@ public class ClientSchema {
             cols[i] = new MarshallerColumn(col.name(), mode(col.type()), null, 
col.scale());
         }
 
-        return Marshaller.createMarshaller(cols, mapper, true, 
allowUnmappedFields);
+        return cols;
     }
 
     private static BinaryMode mode(ColumnType dataType) {
@@ -231,4 +260,55 @@ public class ClientSchema {
                 throw new IgniteException(Client.PROTOCOL_ERR, "Unknown client 
data type: " + dataType);
         }
     }
+
+    private MarshallerSchema marshallerSchema() {
+        if (marshallerSchema == null) {
+            marshallerSchema = new ClientMarshallerSchema(this);
+        }
+        return marshallerSchema;
+    }
+
+    private static class ClientMarshallerSchema implements MarshallerSchema {
+
+        private final ClientSchema schema;
+
+        private MarshallerColumn[] keys;
+
+        private MarshallerColumn[] values;
+
+        private MarshallerColumn[] row;
+
+        private ClientMarshallerSchema(ClientSchema schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public int schemaVersion() {
+            return schema.version();
+        }
+
+        @Override
+        public MarshallerColumn[] keys() {
+            if (keys == null) {
+                keys = schema.toMarshallerColumns(TuplePart.KEY);
+            }
+            return keys;
+        }
+
+        @Override
+        public MarshallerColumn[] values() {
+            if (values == null) {
+                values = schema.toMarshallerColumns(TuplePart.VAL);
+            }
+            return values;
+        }
+
+        @Override
+        public MarshallerColumn[] row() {
+            if (row == null) {
+                row = schema.toMarshallerColumns(TuplePart.KEY_AND_VAL);
+            }
+            return row;
+        }
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 64ec8c7987..168f992993 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.marshaller.UnmappedColumnsException;
 import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
 import org.apache.ignite.lang.IgniteException;
@@ -65,6 +66,8 @@ public class ClientTable implements Table {
 
     private final ReliableChannel ch;
 
+    private final MarshallersProvider marshallers;
+
     private final ConcurrentHashMap<Integer, CompletableFuture<ClientSchema>> 
schemas = new ConcurrentHashMap<>();
 
     private final IgniteLogger log;
@@ -83,14 +86,17 @@ public class ClientTable implements Table {
      * Constructor.
      *
      * @param ch Channel.
+     * @param marshallers Marshallers provider.
      * @param id Table id.
      * @param name Table name.
      */
-    public ClientTable(ReliableChannel ch, int id, String name) {
+    public ClientTable(ReliableChannel ch, MarshallersProvider marshallers, 
int id, String name) {
         assert ch != null;
+        assert marshallers != null;
         assert name != null && !name.isEmpty();
 
         this.ch = ch;
+        this.marshallers = marshallers;
         this.id = id;
         this.name = name;
         this.log = ClientUtils.logger(ch.configuration(), ClientTable.class);
@@ -114,6 +120,15 @@ public class ClientTable implements Table {
         return ch;
     }
 
+    /**
+     * Gets the marshallers provider.
+     *
+     * @return Marshallers provider.
+     */
+    MarshallersProvider marshallers() {
+        return marshallers;
+    }
+
     /** {@inheritDoc} */
     @Override
     public String name() {
@@ -241,7 +256,7 @@ public class ClientTable implements Table {
             }
         }
 
-        var schema = new ClientSchema(schemaVer, columns, colocationColumns);
+        var schema = new ClientSchema(schemaVer, columns, colocationColumns, 
marshallers);
 
         schemas.put(schemaVer, CompletableFuture.completedFuture(schema));
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
index 41eb027d8d..80ef7baa78 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.manager.IgniteTables;
 
@@ -34,13 +35,17 @@ import org.apache.ignite.table.manager.IgniteTables;
 public class ClientTables implements IgniteTables {
     private final ReliableChannel ch;
 
+    private final MarshallersProvider marshallers;
+
     /**
      * Constructor.
      *
      * @param ch Channel.
+     * @param marshallers Marshallers provider.
      */
-    public ClientTables(ReliableChannel ch) {
+    public ClientTables(ReliableChannel ch, MarshallersProvider marshallers) {
         this.ch = ch;
+        this.marshallers = marshallers;
     }
 
     /** {@inheritDoc} */
@@ -58,7 +63,7 @@ public class ClientTables implements IgniteTables {
             var res = new ArrayList<Table>(cnt);
 
             for (int i = 0; i < cnt; i++) {
-                res.add(new ClientTable(ch, in.unpackInt(), 
in.unpackString()));
+                res.add(new ClientTable(ch, marshallers, in.unpackInt(), 
in.unpackString()));
             }
 
             return res;
@@ -77,6 +82,6 @@ public class ClientTables implements IgniteTables {
         Objects.requireNonNull(name);
 
         return ch.serviceAsync(ClientOp.TABLE_GET, w -> 
w.out().packString(name),
-                r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, 
r.in().unpackInt(), name));
+                r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, 
marshallers, r.in().unpackInt(), name));
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
index 724bcfc39d..f36b23f6b5 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.client.table.ClientColumn;
 import org.apache.ignite.internal.client.table.ClientSchema;
 import org.apache.ignite.internal.client.table.ClientTuple;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.Test;
@@ -57,10 +58,12 @@ import org.junit.jupiter.api.Test;
  * <p>Should be in sync with 
org.apache.ignite.internal.table.TupleBuilderImplTest.
  */
 public class ClientTupleTest {
+    private static final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
     private static final ClientSchema SCHEMA = new ClientSchema(1, new 
ClientColumn[]{
             new ClientColumn("ID", ColumnType.INT64, false, true, 0, 0),
             new ClientColumn("NAME", ColumnType.STRING, false, false, -1, 1)
-    }, null);
+    }, null, marshallers);
 
     private static final ClientSchema FULL_SCHEMA = new ClientSchema(100, new 
ClientColumn[]{
             new ClientColumn("I8", ColumnType.INT8, false, false, -1, 0),
@@ -82,7 +85,7 @@ public class ClientTupleTest {
             new ClientColumn("PERIOD", ColumnType.PERIOD, false, false, -1, 
16),
             new ClientColumn("DURATION", ColumnType.DURATION, false, false, 
-1, 17),
             new ClientColumn("NUMBER", ColumnType.NUMBER, false, false, -1, 18)
-    }, null);
+    }, null, marshallers);
 
     private static final UUID GUID = UUID.randomUUID();
 
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 53f1d23f70..0d75203580 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageService;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
@@ -371,7 +372,8 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
         when(internalTable.tableId()).thenReturn(tableId);
         when(internalTable.storage()).thenReturn(mvTableStorage);
 
-        return spy(new TableImpl(internalTable, new HeapLockManager(), new 
ConstantSchemaVersions(1), mock(IgniteSql.class)));
+        ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+        return spy(new TableImpl(internalTable, new HeapLockManager(), new 
ConstantSchemaVersions(1), marshallers, mock(IgniteSql.class)));
     }
 
     private CompletableFuture<MvTableStorage> 
getMvTableStorageLatestRevision(int tableId) {
diff --git a/modules/marshaller-common/build.gradle 
b/modules/marshaller-common/build.gradle
index a19aaa6426..055f7fcb03 100644
--- a/modules/marshaller-common/build.gradle
+++ b/modules/marshaller-common/build.gradle
@@ -26,6 +26,7 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-api')
     implementation libs.jetbrains.annotations
+    implementation libs.caffeine
 
     testAnnotationProcessor libs.jmh.annotation.processor
 
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
index 7adbd581d3..558932de52 100644
--- 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/Marshaller.java
@@ -46,7 +46,7 @@ public abstract class Marshaller {
      * @param allowUnmappedFields Whether specified class can contain fields 
that are not mapped to columns.
      * @return Marshaller.
      */
-    public static Marshaller createMarshaller(
+    static Marshaller createMarshaller(
             MarshallerColumn[] cols,
             Mapper<?> mapper,
             boolean requireAllFields,
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
index 73ec221f25..ffc90799cd 100644
--- 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerColumn.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.marshaller;
 
+import java.util.Objects;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.jetbrains.annotations.Nullable;
@@ -88,4 +89,25 @@ public class MarshallerColumn {
     public int scale() {
         return scale;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        // NOTE: This code ries on the fact that marshaller for a list of 
columns is used by client code
+        // and client code does not provide `defValSup`. Because of that 
`defValSup`  does not participate in equality/hashcode.
+        // It can't do that anyway, since instances of functional interfaces 
have no identity.
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MarshallerColumn that = (MarshallerColumn) o;
+        return scale == that.scale && Objects.equals(name, that.name) && type 
== that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        // See comment in equals method.
+        return Objects.hash(name, type, scale);
+    }
 }
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerSchema.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerSchema.java
new file mode 100644
index 0000000000..e2b73ef78a
--- /dev/null
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallerSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller;
+
+/** Marshaller schema. */
+public interface MarshallerSchema {
+    /** Schema version. */
+    int schemaVersion();
+
+    /** Key columns. */
+    MarshallerColumn[] keys();
+
+    /** Value columns. */
+    MarshallerColumn[] values();
+
+    /** All columns - first key columns, then value columns. */
+    MarshallerColumn[] row();
+}
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallersProvider.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallersProvider.java
new file mode 100644
index 0000000000..220e7f356e
--- /dev/null
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/MarshallersProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller;
+
+import org.apache.ignite.table.mapper.Mapper;
+
+/** Provides marshaller instances. */
+public interface MarshallersProvider {
+    /**
+     * Returns a marshaller for key columns of the given schema.
+     *
+     * @param schema Schema.
+     * @param mapper Mapper.
+     * @param requireAllFields If specified class should contain fields for 
all columns.
+     * @param allowUnmappedFields Whether specified class can contain fields 
that are not mapped to columns.
+     * @return Marshaller.
+     */
+    Marshaller getKeysMarshaller(
+            MarshallerSchema schema,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields
+    );
+
+    /**
+     * Returns a marshaller for value columns of the given schema.
+     *
+     * @param schema Schema.
+     * @param mapper Mapper.
+     * @param requireAllFields If specified class should contain fields for 
all columns.
+     * @param allowUnmappedFields Whether specified class can contain fields 
that are not mapped to columns.
+     * @return Marshaller.
+     */
+    Marshaller getValuesMarshaller(
+            MarshallerSchema schema,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields
+    );
+
+    /**
+     * Returns a marshaller that includes both key and value columns of the 
given schema.
+     *
+     * @param schema Schema.
+     * @param mapper Mapper.
+     * @param requireAllFields If specified class should contain fields for 
all columns.
+     * @param allowUnmappedFields Whether specified class can contain fields 
that are not mapped to columns.
+     * @return Marshaller.
+     */
+    Marshaller getRowMarshaller(
+            MarshallerSchema schema,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields
+    );
+
+    /**
+     * Returns a marshaller for the given columns.
+     *
+     * @param columns Columns.
+     * @param mapper Mapper.
+     * @param requireAllFields If specified class should contain fields for 
all columns.
+     * @param allowUnmappedFields Whether specified class can contain fields 
that are not mapped to columns.
+     * @return Marshaller.
+     */
+    Marshaller getMarshaller(
+            MarshallerColumn[] columns,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields
+    );
+}
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProvider.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProvider.java
new file mode 100644
index 0000000000..1843888342
--- /dev/null
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProvider.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.function.Function;
+import org.apache.ignite.table.mapper.Mapper;
+
+/** Implementation of {@link MarshallersProvider}. */
+public class ReflectionMarshallersProvider implements MarshallersProvider {
+    /** Marshaller cache size for schema based marshallers (schema version, 
mapper, flags). */
+    private static final int KV_CACHE_SIZE = 64;
+
+    /** Marshaller cache size for column based marshallers (columns, mapper, 
flags). */
+    private static final int PROJECTION_CACHE_SIZE = 64;
+
+    /** Marshaller cache by schema. Cached by schema version. */
+    private final MarshallerCache marshallerCache;
+
+    /** Marshaller cache by an arbitrary columns. Cached by columns. */
+    private final MarshallerCache projectionMarshallerCache;
+
+    /** Constructor. */
+    public ReflectionMarshallersProvider() {
+        this.marshallerCache = new MarshallerCache(KV_CACHE_SIZE);
+        this.projectionMarshallerCache = new 
MarshallerCache(PROJECTION_CACHE_SIZE);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Marshaller getKeysMarshaller(
+            MarshallerSchema schema,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields
+    ) {
+
+        MarshallerCacheKey key = new MarshallerCacheKey(
+                schema.schemaVersion(), MarshallerType.KEY_ONLY, mapper, 
requireAllFields, allowUnmappedFields
+        );
+
+        return marshallerCache.getOrAdd(key, k -> {
+            return Marshaller.createMarshaller(schema.keys(), key.mapper, 
key.requireAllFields, key.allowUnmappedFields);
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Marshaller getValuesMarshaller(
+            MarshallerSchema schema,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields) {
+
+        MarshallerCacheKey key = new MarshallerCacheKey(
+                schema.schemaVersion(), MarshallerType.VALUE_ONLY, mapper, 
requireAllFields, allowUnmappedFields
+        );
+
+        return marshallerCache.getOrAdd(key, k -> {
+            return Marshaller.createMarshaller(schema.values(), key.mapper, 
key.requireAllFields, key.allowUnmappedFields);
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Marshaller getRowMarshaller(
+            MarshallerSchema schema,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields
+    ) {
+
+        MarshallerCacheKey key = new MarshallerCacheKey(
+                schema.schemaVersion(), MarshallerType.FULL_ROW, mapper, 
requireAllFields, allowUnmappedFields
+        );
+
+        return marshallerCache.getOrAdd(key, k -> {
+            return Marshaller.createMarshaller(schema.row(), key.mapper, 
key.requireAllFields, key.allowUnmappedFields);
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Marshaller getMarshaller(
+            MarshallerColumn[] columns,
+            Mapper<?> mapper,
+            boolean requireAllFields,
+            boolean allowUnmappedFields
+    ) {
+
+        MarshallerCacheKey key = new MarshallerCacheKey(columns, mapper, 
requireAllFields, allowUnmappedFields);
+
+        return projectionMarshallerCache.getOrAdd(key, k -> {
+            return Marshaller.createMarshaller(k.columns, k.mapper, 
k.requireAllFields, k.allowUnmappedFields);
+        });
+    }
+
+    private static class MarshallerCache {
+
+        private final Cache<MarshallerCacheKey, Marshaller> cache;
+
+        MarshallerCache(int maximumSize) {
+            cache = Caffeine.newBuilder()
+                    .maximumSize(maximumSize)
+                    .build();
+        }
+
+        Marshaller getOrAdd(MarshallerCacheKey key, 
Function<MarshallerCacheKey, Marshaller> func) {
+            return cache.get(key, func);
+        }
+    }
+
+    private enum MarshallerType {
+        KEY_ONLY,
+        VALUE_ONLY,
+        FULL_ROW
+    }
+
+    private static final class MarshallerCacheKey {
+        private static final MarshallerColumn[] NO_COLUMNS = new 
MarshallerColumn[0];
+
+        private final int schemaVersion;
+
+        private final Mapper<?> mapper;
+
+        private final MarshallerColumn[] columns;
+
+        private final MarshallerType type;
+
+        private final boolean requireAllFields;
+
+        private final boolean allowUnmappedFields;
+
+        MarshallerCacheKey(
+                int schemaVersion,
+                MarshallerType type,
+                Mapper<?> mapper,
+                boolean requireAllFields,
+                boolean allowUnmappedFields
+        ) {
+            this.schemaVersion = schemaVersion;
+            this.columns = NO_COLUMNS;
+            this.type = type;
+            this.mapper = mapper;
+            this.requireAllFields = requireAllFields;
+            this.allowUnmappedFields = allowUnmappedFields;
+        }
+
+        MarshallerCacheKey(
+                MarshallerColumn[] columns,
+                Mapper<?> mapper,
+                boolean requireAllFields,
+                boolean allowUnmappedFields
+        ) {
+            this.schemaVersion = -1;
+            this.columns = columns;
+            this.mapper = mapper;
+            this.type = null;
+            this.requireAllFields = requireAllFields;
+            this.allowUnmappedFields = allowUnmappedFields;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            MarshallerCacheKey that = (MarshallerCacheKey) o;
+            return schemaVersion == that.schemaVersion && requireAllFields == 
that.requireAllFields
+                    && allowUnmappedFields == that.allowUnmappedFields && 
Objects.equals(mapper, that.mapper) && Arrays.equals(
+                    columns, that.columns) && type == that.type;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(schemaVersion, mapper, type, 
requireAllFields, allowUnmappedFields);
+            result = 31 * result + Arrays.hashCode(columns);
+            return result;
+        }
+    }
+}
diff --git 
a/modules/marshaller-common/src/test/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProviderSelfTest.java
 
b/modules/marshaller-common/src/test/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProviderSelfTest.java
new file mode 100644
index 0000000000..c0e931ced1
--- /dev/null
+++ 
b/modules/marshaller-common/src/test/java/org/apache/ignite/internal/marshaller/ReflectionMarshallersProviderSelfTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller;
+
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Stream;
+import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests for {@link ReflectionMarshallersProvider}. */
+public class ReflectionMarshallersProviderSelfTest {
+
+    private final MarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
+    @ParameterizedTest
+    @EnumSource(MarshallerType.class)
+    public void testMarshallerCache(MarshallerType marshallerType) {
+        Mapper<TestPoJo> mapper = Mapper.of(TestPoJo.class);
+
+        // This test assumes that Mappers are cached.
+
+        TestMarshallerSchema schema1 = new MarshallerSchemaBuilder()
+                .version(1)
+                .addKey("col1", BinaryMode.INT)
+                .addValue("col2", BinaryMode.INT)
+                .build();
+
+        // Same schema - same versions, same content
+
+        {
+            Marshaller m1 = marshallerType.get(marshallers, schema1, mapper, 
false, true);
+            Marshaller m2 = marshallerType.get(marshallers, schema1, mapper, 
false, true);
+            Marshaller m3 = marshallerType.get(marshallers, schema1, mapper, 
true, true);
+
+            assertSame(m1, m2);
+            assertNotSame(m1, m3);
+            assertNotSame(m2, m3);
+        }
+
+        TestMarshallerSchema schema2 = new MarshallerSchemaBuilder()
+                .version(schema1.version + 1)
+                .addKey("col1", BinaryMode.INT)
+                .addValue("col2", BinaryMode.INT)
+                .addValue("col3", BinaryMode.INT)
+                .build();
+
+        // Different schemas - different versions, different content
+        {
+            Marshaller m1 = marshallerType.get(marshallers, schema1, mapper, 
false, true);
+            Marshaller m2 = marshallerType.get(marshallers, schema2, mapper, 
false, true);
+            Marshaller m3 = marshallerType.get(marshallers, schema2, mapper, 
true, true);
+
+            assertNotSame(m1, m2);
+            assertNotSame(m1, m3);
+        }
+
+        TestMarshallerSchema schema3 = new MarshallerSchemaBuilder()
+                .version(schema2.version + 1)
+                .addKey("col1", BinaryMode.INT)
+                .addValue("col2", BinaryMode.INT)
+                .build();
+
+        // Different schemas - different versions, same content
+        {
+            Marshaller m1 = marshallerType.get(marshallers, schema1, mapper, 
false, true);
+            Marshaller m2 = marshallerType.get(marshallers, schema3, mapper, 
false, true);
+            Marshaller m3 = marshallerType.get(marshallers, schema3, mapper, 
true, true);
+
+            if (marshallerType.cacheBySchemaColumns()) {
+                assertSame(m1, m2);
+                assertNotSame(m1, m3);
+            } else {
+                assertNotSame(m1, m2);
+                assertNotSame(m1, m3);
+            }
+        }
+    }
+
+    enum MarshallerType {
+        /** Uses only key columns. */
+        KEYS,
+        /** Uses only values columns. */
+        VALUES,
+        /** All schema columns. */
+        ROW,
+        /** Arbitrary columns. */
+        PROJECTION
+        ;
+
+        Marshaller get(MarshallersProvider marshallers, TestMarshallerSchema 
schema,
+                Mapper<?> mapper,
+                boolean requireAllFields,
+                boolean allowUnmappedFields) {
+
+            switch (this) {
+                case KEYS:
+                    return marshallers.getKeysMarshaller(schema.schema, 
mapper, requireAllFields, allowUnmappedFields);
+                case VALUES:
+                    return marshallers.getValuesMarshaller(schema.schema, 
mapper, requireAllFields, allowUnmappedFields);
+                case ROW:
+                    return marshallers.getRowMarshaller(schema.schema, mapper, 
requireAllFields, allowUnmappedFields);
+                case PROJECTION:
+                    return marshallers.getMarshaller(schema.columns, mapper, 
requireAllFields, allowUnmappedFields);
+                default:
+                    throw new UnsupportedOperationException("Unexpected 
marshaller type " + this);
+            }
+        }
+
+        boolean cacheBySchemaColumns() {
+            return PROJECTION == this;
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private static class TestPoJo {
+
+        private int col1;
+
+        private int col2;
+
+        private int col3;
+    }
+
+    private static class TestMarshallerSchema {
+
+        private final MarshallerSchema schema;
+
+        private final MarshallerColumn[] columns;
+
+        private final int version;
+
+        TestMarshallerSchema(MarshallerSchema schema, MarshallerColumn[] 
columns) {
+            this.schema = schema;
+            this.columns = columns;
+            this.version = schema.schemaVersion();
+        }
+    }
+
+    private static class MarshallerSchemaBuilder {
+
+        private int version;
+
+        private final List<MarshallerColumn> keys = new ArrayList<>();
+
+        private final List<MarshallerColumn> values = new ArrayList<>();
+
+        MarshallerSchemaBuilder version(int version) {
+            this.version = version;
+            return this;
+        }
+
+        MarshallerSchemaBuilder addKey(String name, BinaryMode binaryMode) {
+            keys.add(new MarshallerColumn(name.toUpperCase(Locale.US), 
binaryMode));
+            return this;
+        }
+
+        MarshallerSchemaBuilder addValue(String name, BinaryMode binaryMode) {
+            values.add(new MarshallerColumn(name.toUpperCase(Locale.US), 
binaryMode));
+            return this;
+        }
+
+        TestMarshallerSchema build() {
+            MarshallerSchema schema = new MarshallerSchema() {
+                @Override
+                public int schemaVersion() {
+                    return version;
+                }
+
+                @Override
+                public MarshallerColumn[] keys() {
+                    return keys.toArray(MarshallerColumn[]::new);
+                }
+
+                @Override
+                public MarshallerColumn[] values() {
+                    return values.toArray(MarshallerColumn[]::new);
+                }
+
+                @Override
+                public MarshallerColumn[] row() {
+                    return Stream.concat(keys.stream(), 
values.stream()).toArray(MarshallerColumn[]::new);
+                }
+            };
+
+            return new TestMarshallerSchema(schema, schema.row());
+        }
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItThinClientColocationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItThinClientColocationTest.java
index 8be8a69bf9..074d763f17 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItThinClientColocationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItThinClientColocationTest.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.client.table.ClientColumn;
 import org.apache.ignite.internal.client.table.ClientSchema;
 import org.apache.ignite.internal.client.table.ClientTupleSerializer;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
@@ -52,6 +53,8 @@ import org.junit.jupiter.params.provider.ValueSource;
  * Tests that client and server have matching colocation logic.
  */
 public class ItThinClientColocationTest extends ClusterPerClassIntegrationTest 
{
+    private static ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
     @ParameterizedTest
     @MethodSource("nativeTypes")
     public void testClientAndServerColocationHashesAreSame(NativeType type)
@@ -114,7 +117,7 @@ public class ItThinClientColocationTest extends 
ClusterPerClassIntegrationTest {
                 ClientTableCommon.getDecimalScale(type),
                 ClientTableCommon.getPrecision(type));
 
-        return new ClientSchema(0, new ClientColumn[]{clientColumn}, null);
+        return new ClientSchema(0, new ClientColumn[]{clientColumn}, null, 
marshallers);
     }
 
     private static TupleMarshallerImpl tupleMarshaller(NativeType type, String 
columnName) {
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
index 062a89dd65..845af4ffb3 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
@@ -27,8 +27,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.marshaller.MarshallerColumn;
+import org.apache.ignite.internal.marshaller.MarshallerSchema;
 import org.apache.ignite.internal.schema.mapping.ColumnMapper;
 import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+import org.apache.ignite.internal.schema.marshaller.MarshallerUtil;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.type.TemporalNativeType;
 import org.apache.ignite.internal.util.ArrayUtils;
@@ -64,6 +67,9 @@ public class SchemaDescriptor {
     /** Column mapper. */
     private ColumnMapper colMapper = ColumnMapping.identityMapping();
 
+    /** Marshaller schema. */
+    private MarshallerSchema marshallerSchema;
+
     /**
      * Constructor.
      *
@@ -272,4 +278,57 @@ public class SchemaDescriptor {
     public String toString() {
         return S.toString(SchemaDescriptor.class, this);
     }
+
+    /** Returns marshaller schema. */
+    public MarshallerSchema marshallerSchema() {
+        if (marshallerSchema == null) {
+            marshallerSchema = new ServerMarshallerSchema(this);
+        }
+        return marshallerSchema;
+    }
+
+    private static class ServerMarshallerSchema implements MarshallerSchema {
+
+        private final SchemaDescriptor schema;
+
+        private MarshallerColumn[] keys;
+
+        private MarshallerColumn[] values;
+
+        private MarshallerColumn[] row;
+
+        private ServerMarshallerSchema(SchemaDescriptor schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public int schemaVersion() {
+            return schema.version();
+        }
+
+        @Override
+        public MarshallerColumn[] keys() {
+            if (keys == null) {
+                keys = 
MarshallerUtil.toMarshallerColumns(schema.keyColumns().columns());
+            }
+            return keys;
+        }
+
+        @Override
+        public MarshallerColumn[] values() {
+            if (values == null) {
+                values = 
MarshallerUtil.toMarshallerColumns(schema.valueColumns().columns());
+            }
+            return values;
+        }
+
+        @Override
+        public MarshallerColumn[] row() {
+            if (row == null) {
+                Column[] cols = 
ArrayUtils.concat(schema.keyColumns().columns(), 
schema.valueColumns().columns());
+                row = MarshallerUtil.toMarshallerColumns(cols);
+            }
+            return row;
+        }
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
index 45d3ebd888..bacf9e39c9 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/KvMarshallerImpl.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
-import static 
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
-
 import org.apache.ignite.internal.marshaller.Marshaller;
 import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallerSchema;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import org.apache.ignite.internal.schema.row.Row;
@@ -54,17 +54,19 @@ public class KvMarshallerImpl<K, V> implements 
KvMarshaller<K, V> {
      * Creates KV marshaller.
      *
      * @param schema Schema descriptor.
+     * @param marshallers Marshallers provider.
      * @param keyMapper Mapper for key objects.
      * @param valueMapper Mapper for value objects.
      */
-    public KvMarshallerImpl(SchemaDescriptor schema, Mapper<K> keyMapper, 
Mapper<V> valueMapper) {
+    public KvMarshallerImpl(SchemaDescriptor schema, MarshallersProvider 
marshallers, Mapper<K> keyMapper, Mapper<V> valueMapper) {
         this.schema = schema;
 
         keyClass = keyMapper.targetType();
         valClass = valueMapper.targetType();
 
-        keyMarsh = 
Marshaller.createMarshaller(toMarshallerColumns(schema.keyColumns().columns()), 
keyMapper, true, false);
-        valMarsh = 
Marshaller.createMarshaller(toMarshallerColumns(schema.valueColumns().columns()),
 valueMapper, true, false);
+        MarshallerSchema marshallerSchema = schema.marshallerSchema();
+        keyMarsh = marshallers.getKeysMarshaller(marshallerSchema, keyMapper, 
true, false);
+        valMarsh = marshallers.getValuesMarshaller(marshallerSchema, 
valueMapper, true, false);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RecordMarshallerImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RecordMarshallerImpl.java
index e64e1d8fe3..350cf6ebb3 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RecordMarshallerImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/RecordMarshallerImpl.java
@@ -17,17 +17,15 @@
 
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
-import static 
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
-
 import java.util.Objects;
 import org.apache.ignite.internal.marshaller.Marshaller;
-import org.apache.ignite.internal.marshaller.MarshallerColumn;
 import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallerSchema;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.RecordMarshaller;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.table.mapper.PojoMapper;
 import org.jetbrains.annotations.Nullable;
@@ -57,22 +55,21 @@ public class RecordMarshallerImpl<R> implements 
RecordMarshaller<R> {
      * Creates KV marshaller.
      *
      * @param schema Schema descriptor.
+     * @param marshallers Marshaller provider.
      * @param mapper Mapper for record objects.
      */
-    public RecordMarshallerImpl(SchemaDescriptor schema, Mapper<R> mapper) {
+    public RecordMarshallerImpl(SchemaDescriptor schema, MarshallersProvider 
marshallers, Mapper<R> mapper) {
         assert mapper instanceof PojoMapper;
 
         this.schema = schema;
 
         recClass = mapper.targetType();
 
-        MarshallerColumn[] keyColumns = 
toMarshallerColumns(schema.keyColumns().columns());
-        MarshallerColumn[] valueColumns = 
toMarshallerColumns(schema.valueColumns().columns());
-
-        keyMarsh = Marshaller.createMarshaller(keyColumns, mapper, true, true);
-        valMarsh = Marshaller.createMarshaller(valueColumns, mapper, false, 
true);
+        MarshallerSchema marshallerSchema = schema.marshallerSchema();
 
-        recMarsh = Marshaller.createMarshaller(ArrayUtils.concat(keyColumns, 
valueColumns), mapper, false, false);
+        keyMarsh = marshallers.getKeysMarshaller(marshallerSchema, mapper, 
true, true);
+        valMarsh = marshallers.getValuesMarshaller(marshallerSchema, mapper, 
false, true);
+        recMarsh = marshallers.getRowMarshaller(marshallerSchema, mapper, 
false, false);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ReflectionMarshallerFactory.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ReflectionMarshallerFactory.java
index 2d783d76e2..a2c0406017 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ReflectionMarshallerFactory.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/reflection/ReflectionMarshallerFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.schema.marshaller.reflection;
 
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
@@ -27,15 +29,17 @@ import org.apache.ignite.table.mapper.Mapper;
  * Factory for reflection-based marshaller.
  */
 public class ReflectionMarshallerFactory implements MarshallerFactory {
+    private final MarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
     /** {@inheritDoc} */
     @Override
     public <K, V> KvMarshaller<K, V> create(SchemaDescriptor schema, Mapper<K> 
keyMapper, Mapper<V> valueMapper) {
-        return new KvMarshallerImpl<>(schema, keyMapper, valueMapper);
+        return new KvMarshallerImpl<>(schema, marshallers, keyMapper, 
valueMapper);
     }
 
     /** {@inheritDoc} */
     @Override
     public <R> RecordMarshaller<R> create(SchemaDescriptor schema, Mapper<R> 
mapper) {
-        return new RecordMarshallerImpl<>(schema, mapper);
+        return new RecordMarshallerImpl<>(schema, marshallers, mapper);
     }
 }
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
index 56e2abe045..fc63c4d405 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/KvMarshallerTest.java
@@ -63,6 +63,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.processing.Generated;
@@ -107,6 +108,9 @@ public class KvMarshallerTest {
         );
     }
 
+    /** Schema version. */
+    private static final AtomicInteger schemaVersion = new AtomicInteger();
+
     /** Random. */
     private Random rnd;
 
@@ -153,7 +157,7 @@ public class KvMarshallerTest {
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void pojoWithFieldsOfAllTypes(MarshallerFactory factory) throws 
MarshallerException {
-        SchemaDescriptor schema = new SchemaDescriptor(1, 
columnsAllTypes(false), columnsAllTypes(true));
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), columnsAllTypes(false), 
columnsAllTypes(true));
 
         final TestObjectWithAllTypes key = 
TestObjectWithAllTypes.randomKey(rnd);
         final TestObjectWithAllTypes val = 
TestObjectWithAllTypes.randomObject(rnd);
@@ -187,7 +191,7 @@ public class KvMarshallerTest {
                 new Column("uuidCol".toUpperCase(), UUID, false),
         };
 
-        SchemaDescriptor schema = new SchemaDescriptor(1, cols, 
columnsAllTypes(true));
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), cols, columnsAllTypes(true));
 
         IllegalArgumentException ex = assertThrows(
                 IllegalArgumentException.class,
@@ -205,7 +209,7 @@ public class KvMarshallerTest {
                 new Column("stringCol".toUpperCase(), STRING, false),
         };
 
-        SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), cols, cols);
 
         IllegalArgumentException ex = assertThrows(
                 IllegalArgumentException.class,
@@ -225,7 +229,7 @@ public class KvMarshallerTest {
     public void columnNameMapping(MarshallerFactory factory) throws 
MarshallerException {
         Assumptions.assumeFalse(factory instanceof AsmMarshallerGenerator, 
"Generated marshaller doesn't support column mapping, yet.");
 
-        SchemaDescriptor schema = new SchemaDescriptor(1,
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(),
                 new Column[]{new Column("key".toUpperCase(), INT64, false)},
                 new Column[]{
                         new Column("col1".toUpperCase(), INT64, false),
@@ -264,7 +268,7 @@ public class KvMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void classWithWrongFieldType(MarshallerFactory factory) {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{
                         new Column("longCol".toUpperCase(), 
NativeTypes.bitmaskOf(42), false),
                         new Column("intCol".toUpperCase(), UUID, false)
@@ -300,7 +304,7 @@ public class KvMarshallerTest {
                 new Column("primitiveDoubleCol", DOUBLE, false)
         };
 
-        SchemaDescriptor schema = new SchemaDescriptor(1, keyCols, valCols);
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), keyCols, valCols);
 
         assertThrows(IllegalArgumentException.class, () -> 
factory.create(schema, TestKeyObject.class, TestObjectWithAllTypes.class));
     }
@@ -309,7 +313,7 @@ public class KvMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void classWithIncorrectBitmaskSize(MarshallerFactory factory) {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{ new Column("key".toUpperCase(), INT32, false) },
                 new Column[]{ new Column("bitmaskCol".toUpperCase(), 
NativeTypes.bitmaskOf(9), true) }
         );
@@ -336,7 +340,7 @@ public class KvMarshallerTest {
                 new Column("primIntCol".toUpperCase(), INT32, false),
         };
 
-        SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), cols, cols);
 
         KvMarshaller<TestObjectWithPrivateConstructor, 
TestObjectWithPrivateConstructor> marshaller =
                 factory.create(schema, TestObjectWithPrivateConstructor.class, 
TestObjectWithPrivateConstructor.class);
@@ -363,7 +367,7 @@ public class KvMarshallerTest {
                 new Column("primLongCol", INT64, false),
         };
 
-        SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), cols, cols);
 
         final Object key = 
TestObjectWithNoDefaultConstructor.randomObject(rnd);
         final Object val = 
TestObjectWithNoDefaultConstructor.randomObject(rnd);
@@ -378,7 +382,7 @@ public class KvMarshallerTest {
                 new Column("primLongCol".toUpperCase(), INT64, false),
         };
 
-        SchemaDescriptor schema = new SchemaDescriptor(1, cols, cols);
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), cols, cols);
 
         final ObjectFactory<PrivateTestObject> objFactory = new 
ObjectFactory<>(PrivateTestObject.class);
         final KvMarshaller<PrivateTestObject, PrivateTestObject> marshaller =
@@ -413,7 +417,7 @@ public class KvMarshallerTest {
                     new Column("col2".toUpperCase(), INT64, false),
             };
 
-            SchemaDescriptor schema = new SchemaDescriptor(1, keyCols, 
valCols);
+            SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), keyCols, valCols);
 
             final Class<?> valClass = createGeneratedObjectClass();
             final ObjectFactory<?> objFactory = new ObjectFactory<>(valClass);
@@ -441,7 +445,7 @@ public class KvMarshallerTest {
         Assumptions.assumeFalse(factory instanceof AsmMarshallerGenerator, 
"Generated marshaller doesn't support column mapping.");
 
         final SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{new Column("key", INT64, false)},
                 new Column[]{new Column("val", BYTES, true),
                 });
@@ -516,7 +520,7 @@ public class KvMarshallerTest {
         Column[] keyCols = {new Column("key", keyType, false)};
         Column[] valCols = {new Column("val", valType, false)};
 
-        SchemaDescriptor schema = new SchemaDescriptor(1, keyCols, valCols);
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), keyCols, valCols);
 
         KvMarshaller<Object, Object> marshaller = factory.create(schema,
                 Mapper.of((Class<Object>) key.getClass(), "\"key\""),
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/RecordMarshallerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/RecordMarshallerTest.java
index a3bf4a4d19..2520d5599f 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/RecordMarshallerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/RecordMarshallerTest.java
@@ -53,6 +53,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 import javax.annotation.processing.Generated;
 import org.apache.ignite.internal.marshaller.MarshallerException;
@@ -87,6 +88,9 @@ public class RecordMarshallerTest {
         return List.of(new ReflectionMarshallerFactory());
     }
 
+    /** Schema version. */
+    private static final AtomicInteger schemaVersion = new AtomicInteger();
+
     /** Random. */
     private Random rnd;
 
@@ -105,7 +109,7 @@ public class RecordMarshallerTest {
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void complexType(MarshallerFactory factory) throws 
MarshallerException {
-        SchemaDescriptor schema = new SchemaDescriptor(1, keyColumns(), 
valueColumnsAllTypes());
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), keyColumns(), 
valueColumnsAllTypes());
 
         final TestObjectWithAllTypes rec = 
TestObjectWithAllTypes.randomObject(rnd);
 
@@ -123,7 +127,7 @@ public class RecordMarshallerTest {
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void truncatedType(MarshallerFactory factory) throws 
MarshallerException {
-        SchemaDescriptor schema = new SchemaDescriptor(1, keyColumns(), 
valueColumnsAllTypes());
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), keyColumns(), 
valueColumnsAllTypes());
 
         RecordMarshaller<TestTruncatedObject> marshaller = 
factory.create(schema, TestTruncatedObject.class);
 
@@ -142,7 +146,7 @@ public class RecordMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void widerType(MarshallerFactory factory) {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 keyColumns(),
                 new Column[]{
                         new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, 
false),
@@ -165,7 +169,7 @@ public class RecordMarshallerTest {
     @ParameterizedTest
     @MethodSource("marshallerFactoryProvider")
     public void mapping(MarshallerFactory factory) throws MarshallerException {
-        SchemaDescriptor schema = new SchemaDescriptor(1,
+        SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(),
                 new Column[]{new Column("key".toUpperCase(), INT64, false)},
                 new Column[]{
                         new Column("col1".toUpperCase(), INT32, false),
@@ -198,7 +202,7 @@ public class RecordMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void classWithWrongFieldType(MarshallerFactory factory) {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{
                         new Column("longCol".toUpperCase(), 
NativeTypes.bitmaskOf(42), false),
                         new Column("intCol".toUpperCase(), UUID, false)
@@ -220,7 +224,7 @@ public class RecordMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void classWithIncorrectBitmaskSize(MarshallerFactory factory) {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{ new Column("key".toUpperCase(), INT32, false) },
                 new Column[]{ new Column("bitmaskCol".toUpperCase(), 
NativeTypes.bitmaskOf(9), true) }
         );
@@ -237,7 +241,7 @@ public class RecordMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void classWithPrivateConstructor(MarshallerFactory factory) throws 
MarshallerException, IllegalAccessException {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{new Column("primLongCol".toUpperCase(), INT64, 
false)},
                 new Column[]{new Column("primIntCol".toUpperCase(), INT32, 
false)}
         );
@@ -257,7 +261,7 @@ public class RecordMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void classWithNoDefaultConstructor(MarshallerFactory factory) {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{new Column("primLongCol".toUpperCase(), INT64, 
false)},
                 new Column[]{new Column("primIntCol".toUpperCase(), INT32, 
false)}
         );
@@ -271,7 +275,7 @@ public class RecordMarshallerTest {
     @MethodSource("marshallerFactoryProvider")
     public void privateClass(MarshallerFactory factory) throws 
MarshallerException {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                schemaVersion.incrementAndGet(),
                 new Column[]{new Column("primLongCol".toUpperCase(), INT64, 
false)},
                 new Column[]{new Column("primIntCol".toUpperCase(), INT32, 
false)}
         );
@@ -306,7 +310,7 @@ public class RecordMarshallerTest {
                     new Column("col2".toUpperCase(), INT64, false),
             };
 
-            SchemaDescriptor schema = new SchemaDescriptor(1, keyCols, 
valCols);
+            SchemaDescriptor schema = new 
SchemaDescriptor(schemaVersion.incrementAndGet(), keyCols, valCols);
 
             final Class<Object> recClass = (Class<Object>) 
createGeneratedObjectClass();
             final ObjectFactory<Object> objFactory = new 
ObjectFactory<>(recClass);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 20980b1df2..b48ceacd20 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -69,6 +70,9 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
     /** Ignite SQL facade. */
     protected final IgniteSql sql;
 
+    /** Marshallers provider. */
+    protected final MarshallersProvider marshallers;
+
     /**
      * Constructor.
      *
@@ -76,13 +80,16 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
      * @param schemaVersions Schema versions access.
      * @param schemaReg Schema registry.
      * @param sql Ignite SQL facade.
+     * @param marshallers Marshallers provider.
      */
-    AbstractTableView(InternalTable tbl, SchemaVersions schemaVersions, 
SchemaRegistry schemaReg, IgniteSql sql) {
+    AbstractTableView(InternalTable tbl, SchemaVersions schemaVersions, 
SchemaRegistry schemaReg, IgniteSql sql,
+            MarshallersProvider marshallers) {
         this.tbl = tbl;
         this.schemaVersions = schemaVersions;
         this.sql = sql;
 
         this.rowConverter = new TableViewRowConverter(schemaReg);
+        this.marshallers = marshallers;
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index b529b950f5..3f5aee330c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -66,10 +67,17 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
      * @param tbl Table storage.
      * @param schemaReg Schema registry.
      * @param schemaVersions Schema versions access.
+     * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
      */
-    public KeyValueBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg, 
SchemaVersions schemaVersions, IgniteSql sql) {
-        super(tbl, schemaVersions, schemaReg, sql);
+    public KeyValueBinaryViewImpl(
+            InternalTable tbl,
+            SchemaRegistry schemaReg,
+            SchemaVersions schemaVersions,
+            MarshallersProvider marshallers,
+            IgniteSql sql
+    ) {
+        super(tbl, schemaVersions, schemaReg, sql, marshallers);
 
         marshallerCache = new TupleMarshallerCache(schemaReg);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 3b78c2b488..c98deebfa6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.table;
 
-import static 
org.apache.ignite.internal.marshaller.Marshaller.createMarshaller;
-import static 
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +30,8 @@ import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.marshaller.Marshaller;
 import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallerSchema;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.marshaller.TupleReader;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -81,6 +80,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
      * @param tbl Table storage.
      * @param schemaRegistry Schema registry.
      * @param schemaVersions Schema versions access.
+     * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
      * @param keyMapper Key class mapper.
      * @param valueMapper Value class mapper.
@@ -89,16 +89,17 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
             InternalTable tbl,
             SchemaRegistry schemaRegistry,
             SchemaVersions schemaVersions,
+            MarshallersProvider marshallers,
             IgniteSql sql,
             Mapper<K> keyMapper,
             Mapper<V> valueMapper
     ) {
-        super(tbl, schemaVersions, schemaRegistry, sql);
+        super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
 
         this.keyMapper = keyMapper;
         this.valueMapper = valueMapper;
 
-        marshallerFactory = (schema) -> new KvMarshallerImpl<>(schema, 
keyMapper, valueMapper);
+        marshallerFactory = (schema) -> new KvMarshallerImpl<>(schema, 
marshallers, keyMapper, valueMapper);
     }
 
     /** {@inheritDoc} */
@@ -473,8 +474,6 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
             return marsh;
         }
 
-        // TODO: Cache marshaller for schema version or upgrade row?
-
         SchemaRegistry registry = rowConverter.registry();
 
         marsh = marshallerFactory.apply(registry.schema(schemaVersion));
@@ -702,8 +701,9 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
         Column[] keyCols = schema.keyColumns().columns();
         Column[] valCols = schema.valueColumns().columns();
 
-        Marshaller keyMarsh = createMarshaller(toMarshallerColumns(keyCols), 
keyMapper, false, true);
-        Marshaller valMarsh = createMarshaller(toMarshallerColumns(valCols), 
valueMapper, false, true);
+        MarshallerSchema marshallerSchema = schema.marshallerSchema();
+        Marshaller keyMarsh = marshallers.getKeysMarshaller(marshallerSchema, 
keyMapper, false, true);
+        Marshaller valMarsh = 
marshallers.getValuesMarshaller(marshallerSchema, valueMapper, false, true);
 
         return (row) -> {
             try {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 5156e0e1e0..714e8ba2e2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -55,10 +56,17 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
      * @param tbl The table.
      * @param schemaRegistry Table schema registry.
      * @param schemaVersions Schema versions access.
+     * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
      */
-    public RecordBinaryViewImpl(InternalTable tbl, SchemaRegistry 
schemaRegistry, SchemaVersions schemaVersions, IgniteSql sql) {
-        super(tbl, schemaVersions, schemaRegistry, sql);
+    public RecordBinaryViewImpl(
+            InternalTable tbl,
+            SchemaRegistry schemaRegistry,
+            SchemaVersions schemaVersions,
+            MarshallersProvider marshallers,
+            IgniteSql sql
+    ) {
+        super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
 
         marshallerCache = new TupleMarshallerCache(schemaRegistry);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 714de8e043..82049cc55e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.table;
 
-import static 
org.apache.ignite.internal.marshaller.Marshaller.createMarshaller;
-import static 
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,6 +26,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import org.apache.ignite.internal.marshaller.Marshaller;
+import org.apache.ignite.internal.marshaller.MarshallerSchema;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.marshaller.TupleReader;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -72,6 +71,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
      * @param tbl Table.
      * @param schemaRegistry Schema registry.
      * @param schemaVersions Schema versions access.
+     * @param marshallers Marshallers provider.
      * @param mapper Record class mapper.
      * @param sql Ignite SQL facade.
      */
@@ -79,13 +79,14 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
             InternalTable tbl,
             SchemaRegistry schemaRegistry,
             SchemaVersions schemaVersions,
+            MarshallersProvider marshallers,
             Mapper<R> mapper,
             IgniteSql sql
     ) {
-        super(tbl, schemaVersions, schemaRegistry, sql);
+        super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
 
         this.mapper = mapper;
-        marshallerFactory = (schema) -> new RecordMarshallerImpl<>(schema, 
mapper);
+        marshallerFactory = (schema) -> new RecordMarshallerImpl<>(schema, 
marshallers, mapper);
     }
 
     /** {@inheritDoc} */
@@ -369,8 +370,6 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
             return marsh;
         }
 
-        // TODO: Cache marshaller for schema version or upgrade row?
-
         SchemaDescriptor schema = 
rowConverter.registry().schema(schemaVersion);
 
         marsh = marshallerFactory.apply(schema);
@@ -542,8 +541,9 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     protected Function<SqlRow, R> queryMapper(ResultSetMetadata meta, 
SchemaDescriptor schema) {
+        MarshallerSchema marshallerSchema = schema.marshallerSchema();
+        Marshaller marsh = marshallers.getRowMarshaller(marshallerSchema, 
mapper, false, true);
         Column[] cols = ArrayUtils.concat(schema.keyColumns().columns(), 
schema.valueColumns().columns());
-        Marshaller marsh = createMarshaller(toMarshallerColumns(cols), mapper, 
false, true);
 
         return (row) -> {
             try {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index f12ddf0b31..11e42c6e73 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -78,18 +80,28 @@ public class TableImpl implements TableViewInternal {
 
     private final Map<Integer, IndexWrapper> indexWrapperById = new 
ConcurrentHashMap<>();
 
+    private final MarshallersProvider marshallers;
+
     /**
      * Constructor.
      *
      * @param tbl The table.
      * @param lockManager Lock manager.
      * @param schemaVersions Schema versions access.
+     * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
      */
-    public TableImpl(InternalTable tbl, LockManager lockManager, 
SchemaVersions schemaVersions, IgniteSql sql) {
+    public TableImpl(
+            InternalTable tbl,
+            LockManager lockManager,
+            SchemaVersions schemaVersions,
+            MarshallersProvider marshallers,
+            IgniteSql sql
+    ) {
         this.tbl = tbl;
         this.lockManager = lockManager;
         this.schemaVersions = schemaVersions;
+        this.marshallers = marshallers;
         this.sql = sql;
     }
 
@@ -104,7 +116,7 @@ public class TableImpl implements TableViewInternal {
      */
     @TestOnly
     public TableImpl(InternalTable tbl, SchemaRegistry schemaReg, LockManager 
lockManager, SchemaVersions schemaVersions, IgniteSql sql) {
-        this(tbl, lockManager, schemaVersions, sql);
+        this(tbl, lockManager, schemaVersions, new 
ReflectionMarshallersProvider(), sql);
 
         this.schemaReg = schemaReg;
     }
@@ -154,22 +166,22 @@ public class TableImpl implements TableViewInternal {
 
     @Override
     public <R> RecordView<R> recordView(Mapper<R> recMapper) {
-        return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, recMapper, 
sql);
+        return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, 
marshallers, recMapper, sql);
     }
 
     @Override
     public RecordView<Tuple> recordView() {
-        return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions, sql);
+        return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions, 
marshallers, sql);
     }
 
     @Override
     public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper, 
Mapper<V> valMapper) {
-        return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions, sql, 
keyMapper, valMapper);
+        return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions, 
marshallers, sql, keyMapper, valMapper);
     }
 
     @Override
     public KeyValueView<Tuple, Tuple> keyValueView() {
-        return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions, sql);
+        return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions, 
marshallers, sql);
     }
 
     @Override
@@ -193,7 +205,7 @@ public class TableImpl implements TableViewInternal {
         Objects.requireNonNull(keyMapper);
 
         BinaryRowEx keyRow;
-        var marshaller = new KvMarshallerImpl<>(schemaReg.lastKnownSchema(), 
keyMapper, keyMapper);
+        var marshaller = new KvMarshallerImpl<>(schemaReg.lastKnownSchema(), 
marshallers, keyMapper, keyMapper);
         try {
             keyRow = marshaller.marshal(key);
         } catch (MarshallerException e) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 33e38bd7ab..c29244807c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -106,6 +106,7 @@ import 
org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -364,6 +365,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Configuration for {@link StorageUpdateHandler}. */
     private final StorageUpdateConfiguration storageUpdateConfig;
 
+    /** Marshallers provider. */
+    private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
     /**
      * Creates a new table manager.
      *
@@ -1234,7 +1238,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 placementDriver
         );
 
-        var table = new TableImpl(internalTable, lockMgr, schemaVersions, 
sql.get());
+        var table = new TableImpl(internalTable, lockMgr, schemaVersions, 
marshallers, sql.get());
 
         // TODO: IGNITE-18595 We need to do something different to wait for 
indexes before full rebalancing
         table.addIndexesToWait(collectTableIndexIds(tableId, catalogVersion, 
onNodeRecovery));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
index 7eaca14a5e..53faae7d06 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.verify;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -451,9 +452,13 @@ public class KeyValueBinaryViewOperationsTest extends 
TableKvOperationsTestBase
     void retriesOnInternalSchemaVersionMismatchException() throws Exception {
         SchemaDescriptor schema = schemaDescriptor();
         InternalTable internalTable = spy(createInternalTable(schema));
+        ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
         KeyValueView<Tuple, Tuple> view = new KeyValueBinaryViewImpl(
-                internalTable, new DummySchemaManagerImpl(schema), 
schemaVersions, mock(IgniteSql.class)
+                internalTable,
+                new DummySchemaManagerImpl(schema),
+                schemaVersions,
+                marshallers, mock(IgniteSql.class)
         );
 
         BinaryRow resultRow = new 
TupleMarshallerImpl(schema).marshal(Tuple.create().set("ID", 1L).set("VAL", 
2L));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
index a8034e55ad..b998131b97 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
@@ -58,6 +58,8 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
@@ -685,8 +687,9 @@ public class KeyValueViewOperationsTest extends 
TableKvOperationsTestBase {
 
         TestKeyObject key = new TestKeyObject(1);
         TestObjectWithAllTypes expectedValue = 
TestObjectWithAllTypes.randomObject(rnd);
+        MarshallersProvider marshallers = new ReflectionMarshallersProvider();
 
-        BinaryRow resultRow = new KvMarshallerImpl<>(schema, keyMapper, 
valMapper)
+        BinaryRow resultRow = new KvMarshallerImpl<>(schema, marshallers, 
keyMapper, valMapper)
                 .marshal(key, expectedValue);
 
         doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
@@ -718,10 +721,12 @@ public class KeyValueViewOperationsTest extends 
TableKvOperationsTestBase {
 
         assertEquals(Collections.emptySet(), missedTypes);
 
+        ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
         return new KeyValueViewImpl<>(
                 internalTable,
                 new DummySchemaManagerImpl(schema),
                 schemaVersions,
+                marshallers,
                 mock(IgniteSql.class),
                 keyMapper,
                 valMapper
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
index 295980048e..68f8f36742 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.verify;
 
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.InvalidTypeException;
@@ -583,9 +584,10 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
     void retriesOnInternalSchemaVersionMismatchException() throws Exception {
         SchemaDescriptor schema = schemaDescriptor();
         InternalTable internalTable = spy(createInternalTable(schema));
+        ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
         RecordView<Tuple> view = new RecordBinaryViewImpl(
-                internalTable, new DummySchemaManagerImpl(schema), 
schemaVersions, mock(IgniteSql.class)
+                internalTable, new DummySchemaManagerImpl(schema), 
schemaVersions, marshallers, mock(IgniteSql.class)
         );
 
         BinaryRow resultRow = new 
TupleMarshallerImpl(schema).marshal(Tuple.create().set("id", 1L).set("val", 
2L));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
index bc43620fc9..19299fac82 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
@@ -353,8 +354,9 @@ public class RecordViewOperationsTest extends 
TableKvOperationsTestBase {
         RecordView<TestObjectWithAllTypes> view = recordView();
 
         TestObjectWithAllTypes expectedRecord = 
TestObjectWithAllTypes.randomObject(rnd);
+        ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
-        BinaryRow resultRow = new RecordMarshallerImpl<>(schema, recMapper)
+        BinaryRow resultRow = new RecordMarshallerImpl<>(schema, marshallers, 
recMapper)
                 .marshal(expectedRecord);
 
         doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
@@ -386,10 +388,13 @@ public class RecordViewOperationsTest extends 
TableKvOperationsTestBase {
 
         assertEquals(Collections.emptySet(), missedTypes);
 
+        ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
         return new RecordViewImpl<>(
                 internalTable,
                 new DummySchemaManagerImpl(schema),
                 schemaVersions,
+                marshallers,
                 recMapper,
                 mock(IgniteSql.class)
         );

Reply via email to