This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new de146ce3c2 IGNITE-19453 Move raft command serialization before the 
action request creation. (#2772)
de146ce3c2 is described below

commit de146ce3c2842ed7b6f6dea6ad9e6cc2064f4a26
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Tue Nov 14 16:35:32 2023 +0300

    IGNITE-19453 Move raft command serialization before the action request 
creation. (#2772)
---
 .../impl/ItMetaStorageServicePersistenceTest.java  |   7 ++
 .../server/raft/ItMetaStorageRaftGroupTest.java    |  33 +++++--
 .../server/raft/MetaStorageListener.java           |   4 +-
 .../server/raft/MetaStorageWriteHandler.java       |   6 +-
 .../MessageDeserializerGenerator.java              |   6 +-
 .../serialization/MessageSerializerGenerator.java  |   7 +-
 .../ignite/network/annotations/Transient.java}     |  17 ++--
 modules/network/README.md                          |   3 +
 .../stream/DirectByteBufferStreamImplV1.java       |   3 +-
 .../internal/placementdriver/ActiveActorTest.java  |   8 +-
 .../placementdriver/PlacementDriverManager.java    |   3 +-
 .../apache/ignite/internal/raft/Marshaller.java    |  53 +++++++++++
 .../apache/ignite/internal/raft/RaftManager.java   |   7 +-
 .../ignite/internal/raft/RaftServiceFactory.java   |   4 +-
 .../internal/raft/service/BeforeApplyHandler.java  |   3 +-
 .../raft/server/ItJraftCounterServerTest.java      |  32 ++++---
 .../ignite/raft/server/ItJraftHlcServerTest.java   |   8 +-
 .../raft/server/ItSimpleCounterServerTest.java     |  15 +++-
 .../ignite/raft/server/JraftAbstractTest.java      |   5 +-
 .../java/org/apache/ignite/internal/raft/Loza.java |  34 +++++--
 .../ignite/internal/raft/RaftGroupServiceImpl.java |  20 ++++-
 .../internal/raft/server/RaftGroupOptions.java     |   2 +-
 .../ignite/internal/raft/server/RaftServer.java    |   1 +
 .../internal/raft/server/impl/JraftServerImpl.java |  15 ++--
 .../internal/raft/util/OptimizedMarshaller.java    |   2 +-
 .../raft/util/ThreadLocalOptimizedMarshaller.java  |   2 +-
 .../ignite/raft/jraft/option/NodeOptions.java      |   4 +-
 .../ignite/raft/jraft/rpc/WriteActionRequest.java  |  14 ++-
 .../jraft/rpc/impl/ActionRequestInterceptor.java   |  11 +--
 .../jraft/rpc/impl/ActionRequestProcessor.java     | 100 +++++++++++++++------
 .../impl/InterceptingActionRequestProcessor.java   |   8 +-
 .../rpc/impl/NullActionRequestInterceptor.java     |   3 +-
 .../ignite/raft/jraft/storage/io/MessageFile.java  |   7 +-
 .../snapshot/local/LocalSnapshotMetaTable.java     |   6 +-
 .../ignite/raft/jraft/util/JDKMarshaller.java      |   6 +-
 .../ignite/internal/raft/RaftGroupServiceTest.java |  24 +++--
 .../raft/jraft/rpc/AppendEntriesBenchmark.java     |  10 +--
 .../service/ItAbstractListenerSnapshotTest.java    |  16 ++--
 .../raft/client/TopologyAwareRaftGroupService.java |   7 +-
 .../TopologyAwareRaftGroupServiceFactory.java      |   7 +-
 .../client/TopologyAwareRaftGroupServiceTest.java  |  15 +++-
 .../internal/sql/engine/ItBuildIndexTest.java      |  10 ++-
 modules/table/build.gradle                         |   1 +
 .../ignite/distributed/ItTablePersistenceTest.java |   7 ++
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  19 +++-
 .../internal/table/distributed/TableManager.java   |   6 +-
 .../distributed/command/CatalogVersionAware.java   |   7 ++
 .../distributed/command/PartitionCommand.java      |  12 ++-
 .../table/distributed/raft/PartitionListener.java  |   4 +-
 .../schema/CheckCatalogVersionOnActionRequest.java |  18 ++--
 .../schema/CheckCatalogVersionOnAppendEntries.java |  13 ++-
 .../schema/PartitionCommandsMarshaller.java        |   9 +-
 .../schema/PartitionCommandsMarshallerImpl.java    |  20 ++---
 .../table/distributed/TableManagerTest.java        |  10 ++-
 .../CheckCatalogVersionOnActionRequestTest.java    |  25 +++---
 .../PartitionCommandsMarshallerImplTest.java       |   2 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  13 +--
 57 files changed, 511 insertions(+), 203 deletions(-)

diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index ea6930e0fa..bd4b87a4ab 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -32,11 +32,13 @@ import 
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -158,6 +160,11 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
         return new TestReplicationGroupId("metastorage");
     }
 
+    @Override
+    protected Marshaller commandsMarshaller(ClusterService clusterService) {
+        return new 
ThreadLocalOptimizedMarshaller(clusterService.serializationRegistry());
+    }
+
     /**
      * Check meta storage entry.
      *
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 3a84a2f05c..54e7d35bc0 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.Cursor;
@@ -347,17 +348,22 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
 
         assertTrue(cluster.size() > 1);
 
+        var commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(cluster.get(0).serializationRegistry());
+
         NodeOptions opt1 = new NodeOptions();
         opt1.setReplicationStateListeners(
                 List.of(new 
UserReplicatorStateListener(replicatorStartedCounter, 
replicatorStoppedCounter)));
+        opt1.setCommandsMarshaller(commandsMarshaller);
 
         NodeOptions opt2 = new NodeOptions();
         opt2.setReplicationStateListeners(
                 List.of(new 
UserReplicatorStateListener(replicatorStartedCounter, 
replicatorStoppedCounter)));
+        opt2.setCommandsMarshaller(commandsMarshaller);
 
         NodeOptions opt3 = new NodeOptions();
         opt3.setReplicationStateListeners(
                 List.of(new 
UserReplicatorStateListener(replicatorStartedCounter, 
replicatorStoppedCounter)));
+        opt3.setCommandsMarshaller(commandsMarshaller);
 
         metaStorageRaftSrv1 = new JraftServerImpl(cluster.get(0), 
workDir.resolve("node1"), opt1, new RaftGroupEventsClientListener());
 
@@ -386,35 +392,38 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration,
                 new MetaStorageListener(mockStorage, 
mock(ClusterTimeImpl.class)), defaults());
 
-        metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start(
+        metaStorageRaftGrpSvc1 = 
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
                 MetastorageGroupId.INSTANCE,
                 cluster.get(0),
                 FACTORY,
                 raftConfiguration,
                 membersConfiguration,
                 true,
-                executor
-        ).get();
+                executor,
+                commandsMarshaller
+        ));
 
-        metaStorageRaftGrpSvc2 = RaftGroupServiceImpl.start(
+        metaStorageRaftGrpSvc2 = 
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
                 MetastorageGroupId.INSTANCE,
                 cluster.get(1),
                 FACTORY,
                 raftConfiguration,
                 membersConfiguration,
                 true,
-                executor
-        ).get();
+                executor,
+                commandsMarshaller
+        ));
 
-        metaStorageRaftGrpSvc3 = RaftGroupServiceImpl.start(
+        metaStorageRaftGrpSvc3 = 
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
                 MetastorageGroupId.INSTANCE,
                 cluster.get(2),
                 FACTORY,
                 raftConfiguration,
                 membersConfiguration,
                 true,
-                executor
-        ).get();
+                executor,
+                commandsMarshaller
+        ));
 
         assertTrue(waitForCondition(
                         () -> sameLeaders(metaStorageRaftGrpSvc1, 
metaStorageRaftGrpSvc2, metaStorageRaftGrpSvc3), 10_000),
@@ -430,6 +439,12 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         return raftServersRaftGroups;
     }
 
+    private static RaftGroupService 
waitForRaftGroupServiceSafely(CompletableFuture<RaftGroupService> future) {
+        assertThat(future, willCompleteSuccessfully());
+
+        return future.join();
+    }
+
     /**
      * Checks if all raft groups have the same leader.
      *
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index d3b5043d97..43cf5624e0 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -154,8 +154,8 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
     }
 
     @Override
-    public void onBeforeApply(Command command) {
-        writeHandler.beforeApply(command);
+    public boolean onBeforeApply(Command command) {
+        return writeHandler.beforeApply(command);
     }
 
     @Override
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 91feed7beb..3648d86827 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -294,7 +294,7 @@ public class MetaStorageWriteHandler {
         }
     }
 
-    void beforeApply(Command command) {
+    boolean beforeApply(Command command) {
         if (command instanceof MetaStorageWriteCommand) {
             // Initiator sends us a timestamp to adjust to.
             // Alter command by setting safe time based on the adjusted clock.
@@ -303,6 +303,10 @@ public class MetaStorageWriteHandler {
             clusterTime.adjust(writeCommand.initiatorTime());
 
             writeCommand.safeTimeLong(clusterTime.nowLong());
+
+            return true;
         }
+
+        return false;
     }
 }
diff --git 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
index e753dfb211..c9888afc2d 100644
--- 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
+++ 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network.processor.serialization;
 
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.network.processor.messages.MessageImplGenerator.getByteArrayFieldName;
 
 import com.squareup.javapoet.ArrayTypeName;
@@ -35,6 +36,7 @@ import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
 import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transient;
 import org.apache.ignite.network.serialization.MessageDeserializer;
 import org.apache.ignite.network.serialization.MessageMappingException;
 import org.apache.ignite.network.serialization.MessageReader;
@@ -118,7 +120,9 @@ public class MessageDeserializerGenerator {
                 .addParameter(MessageReader.class, "reader")
                 .addException(MessageMappingException.class);
 
-        List<ExecutableElement> getters = message.getters();
+        List<ExecutableElement> getters = message.getters().stream()
+                .filter(e -> e.getAnnotation(Transient.class) == null)
+                .collect(toList());
 
         method
                 .beginControlFlow("if (!reader.beforeMessageRead())")
diff --git 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
index 36b90a4cef..9692ef9a10 100644
--- 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
+++ 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.network.processor.serialization;
 
+import static java.util.stream.Collectors.toList;
+
 import com.squareup.javapoet.ClassName;
 import com.squareup.javapoet.CodeBlock;
 import com.squareup.javapoet.MethodSpec;
@@ -30,6 +32,7 @@ import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transient;
 import org.apache.ignite.network.serialization.MessageMappingException;
 import org.apache.ignite.network.serialization.MessageSerializer;
 import org.apache.ignite.network.serialization.MessageWriter;
@@ -87,7 +90,9 @@ public class MessageSerializerGenerator {
 
         method.addStatement("$T message = ($T) msg", message.implClassName(), 
message.implClassName()).addCode("\n");
 
-        List<ExecutableElement> getters = message.getters();
+        List<ExecutableElement> getters = message.getters().stream()
+                .filter(e -> e.getAnnotation(Transient.class) == null)
+                .collect(toList());
 
         method
                 .beginControlFlow("if (!writer.isHeaderWritten())")
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java 
b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/Transient.java
similarity index 64%
rename from 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java
rename to 
modules/network-api/src/main/java/org/apache/ignite/network/annotations/Transient.java
index cb4891e9cb..ecba91ed8d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java
+++ 
b/modules/network-api/src/main/java/org/apache/ignite/network/annotations/Transient.java
@@ -14,14 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.raft.jraft.util;
 
-import java.nio.ByteBuffer;
+package org.apache.ignite.network.annotations;
 
-public interface Marshaller {
-    public static Marshaller DEFAULT = new JDKMarshaller();
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-    byte[] marshall(Object o);
+/**
+ * Annotation for methods of {@link Transferable} interface, that makes 
generated serializer/deserializer ignore the property.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.SOURCE)
+public @interface Transient {
 
-    <T> T unmarshall(ByteBuffer raw);
 }
diff --git a/modules/network/README.md b/modules/network/README.md
index 12645de7b4..d189153aaa 100644
--- a/modules/network/README.md
+++ b/modules/network/README.md
@@ -18,6 +18,9 @@ Supported types:
  + `java.util.Map<K, V>` where `K` and `V` can be any supported type
  + Arrays of all supported types
 
+If one wants to have a property in the message, that shouldn't be serialized, 
`@Transient` annotation should be used.
+Such properties will still be available in builder, like the rest.
+
 ## Threading
 Every Ignite node has three network thread pool executors and thread naming 
formats:
 + Client worker - handles channel events on a client 
(`{consistentId}-client-X`)
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index fe3416e462..1014c85241 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -46,6 +46,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.RandomAccess;
 import java.util.Set;
 import java.util.UUID;
@@ -186,7 +187,7 @@ public class DirectByteBufferStreamImplV1 implements 
DirectByteBufferStream {
      * @param serializationRegistry Serialization service.       .
      */
     public DirectByteBufferStreamImplV1(MessageSerializationRegistry 
serializationRegistry) {
-        this.serializationRegistry = serializationRegistry;
+        this.serializationRegistry = 
Objects.requireNonNull(serializationRegistry, "serializationRegistry");
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 66feec2163..210dce87a1 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -71,6 +71,7 @@ import 
org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -465,10 +466,12 @@ public class ActiveActorTest extends IgniteAbstractTest {
 
                 var dataPath = workDir.resolve("raft_" + 
localPeer.consistentId());
 
+                NodeOptions nodeOptions = new NodeOptions();
+                nodeOptions.setCommandsMarshaller(new 
ThreadLocalOptimizedMarshaller(cluster.serializationRegistry()));
                 var raftServer = new JraftServerImpl(
                         cluster,
                         dataPath,
-                        new NodeOptions(),
+                        nodeOptions,
                         eventsClientListener
                 );
                 raftServer.start();
@@ -527,7 +530,8 @@ public class ActiveActorTest extends IgniteAbstractTest {
                 executor,
                 new LogicalTopologyServiceTestImpl(localClusterService),
                 eventsClientListener,
-                notifyOnSubscription
+                notifyOnSubscription,
+                new 
ThreadLocalOptimizedMarshaller(localClusterService.serializationRegistry())
         ).join();
     }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 51526218e0..1b2a82f707 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -150,7 +150,8 @@ public class PlacementDriverManager implements 
IgniteComponent {
                             return raftManager.startRaftGroupService(
                                     replicationGroupId,
                                     
PeersAndLearners.fromConsistentIds(placementDriverNodes),
-                                    topologyAwareRaftGroupServiceFactory
+                                    topologyAwareRaftGroupServiceFactory,
+                                    null // Use default commands marshaller.
                             ).thenCompose(client -> 
client.subscribeLeader(this::onLeaderChange).thenApply(v -> client));
                         } catch (NodeStoppingException e) {
                             return failedFuture(e);
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Marshaller.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Marshaller.java
new file mode 100644
index 0000000000..a41a31268a
--- /dev/null
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Marshaller.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Marshaller interface, for instances that convert objects to {@code byte[]} 
and back.
+ */
+public interface Marshaller {
+    /**
+     * Converts an object into a byte array.
+     *
+     * @param o Object to serialize.
+     * @return Byte buffer with a serialized object.
+     */
+    byte[] marshall(Object o);
+
+    /**
+     * Converts byte buffer back to an object.
+     *
+     * @param raw Byte buffer with a serialized object.
+     * @param <T> Generic type for avoiding explicit cast of the result 
instance.
+     * @return Deserialized object.
+     */
+    <T> T unmarshall(ByteBuffer raw);
+
+    /**
+     * Overloaded {@link #unmarshall(ByteBuffer)}, which can be used to avoid 
manual {@link ByteBuffer#wrap(byte[])} calls.
+     *
+     * @param bytes Array with a serialized object.
+     * @param <T> Generic type for avoiding explicit cast of the result 
instance.
+     * @return Deserialized object.
+     */
+    default <T> T unmarshall(byte[] bytes) {
+        return unmarshall(ByteBuffer.wrap(bytes));
+    }
+}
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index e9d03e6408..a9cfff5c35 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Raft manager.
@@ -148,6 +150,7 @@ public interface RaftManager extends IgniteComponent {
      * @return Future that will be completed with an instance of a Raft group 
service.
      * @throws NodeStoppingException If node stopping intention was detected.
      */
+    @TestOnly
     CompletableFuture<RaftGroupService> startRaftGroupService(
             ReplicationGroupId groupId,
             PeersAndLearners configuration
@@ -159,12 +162,14 @@ public interface RaftManager extends IgniteComponent {
      * @param groupId Raft group ID.
      * @param configuration Peers and Learners of the Raft group.
      * @param factory Factory that should be used to create raft service.
+     * @param commandsMarshaller Marshaller that should be used to serialize 
commands. {@code null} if default marshaller should be used.
      * @return Future that will be completed with an instance of a Raft group 
service.
      * @throws NodeStoppingException If node stopping intention was detected.
      */
     <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
             ReplicationGroupId groupId,
             PeersAndLearners configuration,
-            RaftServiceFactory<T> factory
+            RaftServiceFactory<T> factory,
+            @Nullable Marshaller commandsMarshaller
     ) throws NodeStoppingException;
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
index ecea7e524d..c800a07cb9 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
@@ -34,12 +34,14 @@ public interface RaftServiceFactory<T extends 
RaftGroupService> {
      * @param peersAndLearners Peers configuration.
      * @param raftConfiguration Raft configuration.
      * @param raftClientExecutor Client executor.
+     * @param commandsMarshaller Marshaller that should be used to serialize 
commands.
      * @return Future that contains client when completes.
      */
     CompletableFuture<T> startRaftGroupService(
             ReplicationGroupId groupId,
             PeersAndLearners peersAndLearners,
             RaftConfiguration raftConfiguration,
-            ScheduledExecutorService raftClientExecutor
+            ScheduledExecutorService raftClientExecutor,
+            Marshaller commandsMarshaller
     );
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
index c8fd7ca1b4..ef78c7ffb5 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
@@ -38,6 +38,7 @@ public interface BeforeApplyHandler {
      * this is a place to do it.
      *
      * @param command The command.
+     * @return {@code true} if command has been changed, i.e. its fields have 
been modified. {@code false} otherwise.
      */
-    void onBeforeApply(Command command);
+    boolean onBeforeApply(Command command);
 }
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index da083d2c31..9835fc8f6f 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -108,11 +109,13 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
 
                 Peer serverPeer = initialMembersConf.peer(localNodeName);
 
+                RaftGroupOptions groupOptions = groupOptions(raftServer);
+
                 raftServer.startRaftNode(
-                        new RaftNodeId(COUNTER_GROUP_0, serverPeer), 
initialMembersConf, listenerFactory.get(), defaults()
+                        new RaftNodeId(COUNTER_GROUP_0, serverPeer), 
initialMembersConf, listenerFactory.get(), groupOptions
                 );
                 raftServer.startRaftNode(
-                        new RaftNodeId(COUNTER_GROUP_1, serverPeer), 
initialMembersConf, listenerFactory.get(), defaults()
+                        new RaftNodeId(COUNTER_GROUP_1, serverPeer), 
initialMembersConf, listenerFactory.get(), groupOptions
                 );
             }, opts -> {});
         }
@@ -131,7 +134,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
 
             var nodeId = new RaftNodeId(new 
TestReplicationGroupId("test_raft_group"), 
initialMembersConf.peer(localNodeName));
 
-            raftServer.startRaftNode(nodeId, initialMembersConf, 
listenerFactory.get(), defaults());
+            raftServer.startRaftNode(nodeId, initialMembersConf, 
listenerFactory.get(), groupOptions(raftServer));
         }, opts -> {});
 
         Set<Thread> threads = getAllDisruptorCurrentThreads();
@@ -150,7 +153,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
             for (int i = 0; i < 10; i++) {
                 var nodeId = new RaftNodeId(new 
TestReplicationGroupId("test_raft_group_" + i), serverPeer);
 
-                srv.startRaftNode(nodeId, initialMembersConf, 
listenerFactory.get(), defaults());
+                srv.startRaftNode(nodeId, initialMembersConf, 
listenerFactory.get(), groupOptions(srv));
             }
         });
 
@@ -535,7 +538,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
 
                         Peer serverPeer = 
initialMembersConf.peer(localNodeName);
 
-                        srv.startRaftNode(new RaftNodeId(groupId, serverPeer), 
initialMembersConf, listenerFactory.get(), defaults());
+                        RaftGroupOptions groupOptions = groupOptions(srv);
+                        srv.startRaftNode(new RaftNodeId(groupId, serverPeer), 
initialMembersConf, listenerFactory.get(), groupOptions);
                     }
                 }));
             }
@@ -592,7 +596,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
 
                 var listener = new UpdateCountRaftListener(counter, 
snapshotDataStorage);
 
-                RaftGroupOptions opts = defaults().snapshotStorageFactory(new 
SnapshotInMemoryStorageFactory(snapshotMetaStorage));
+                RaftGroupOptions opts = groupOptions(raftServer)
+                        .snapshotStorageFactory(new 
SnapshotInMemoryStorageFactory(snapshotMetaStorage));
 
                 raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), 
initialMembersConf, listener, opts);
             }, opts -> {});
@@ -654,7 +659,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
 
                 var listener = new UpdateCountRaftListener(counter, 
snapshotDataStorage);
 
-                RaftGroupOptions opts = defaults().snapshotStorageFactory(new 
SnapshotInMemoryStorageFactory(snapshotMetaStorage));
+                RaftGroupOptions opts = groupOptions(raftServer)
+                        .snapshotStorageFactory(new 
SnapshotInMemoryStorageFactory(snapshotMetaStorage));
 
                 raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), 
initialMembersConf, listener, opts);
 
@@ -789,8 +795,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
 
             Peer serverPeer = initialMembersConf.peer(localNodeName);
 
-            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), 
initialMembersConf, listenerFactory.get(), defaults());
-            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), 
initialMembersConf, listenerFactory.get(), defaults());
+            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), 
initialMembersConf, listenerFactory.get(), groupOptions(r));
+            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), 
initialMembersConf, listenerFactory.get(), groupOptions(r));
         }, opts -> {});
 
         waitForCondition(() -> validateStateMachine(sum(20), svc2, 
COUNTER_GROUP_0), 5_000);
@@ -808,8 +814,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
 
             Peer serverPeer = initialMembersConf.peer(localNodeName);
 
-            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), 
initialMembersConf, listenerFactory.get(), defaults());
-            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), 
initialMembersConf, listenerFactory.get(), defaults());
+            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), 
initialMembersConf, listenerFactory.get(), groupOptions(r));
+            r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), 
initialMembersConf, listenerFactory.get(), groupOptions(r));
         }, opts -> {});
 
         waitForCondition(() -> validateStateMachine(sum(20), svc3, 
COUNTER_GROUP_0), 5_000);
@@ -872,4 +878,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
         clients.get(0).<Long>run(incrementAndGetCommand(1)).get();
         assertEquals(index + 1, clients.get(0).readIndex().join());
     }
+
+    private static RaftGroupOptions groupOptions(RaftServer raftServer) {
+        return defaults().commandsMarshaller(new 
ThreadLocalOptimizedMarshaller(raftServer.clusterService().serializationRegistry()));
+    }
 }
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java
index f024ca4e52..2fcbeef9f4 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
@@ -157,6 +159,8 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest {
      */
     @Test
     public void testHlcOneInstancePerIgniteNode() {
+        ThreadLocalOptimizedMarshaller commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(defaultSerializationRegistry());
+
         startServer(0, raftServer -> {
             String localNodeName = 
raftServer.clusterService().topologyService().localMember().name();
 
@@ -164,7 +168,7 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest {
 
             var nodeId = new RaftNodeId(new 
TestReplicationGroupId("test_raft_group"), localNode);
 
-            raftServer.startRaftNode(nodeId, initialConf, 
listenerFactory.get(), defaults());
+            raftServer.startRaftNode(nodeId, initialConf, 
listenerFactory.get(), defaults().commandsMarshaller(commandsMarshaller));
         }, opts -> {});
 
         servers.forEach(srv -> {
@@ -175,7 +179,7 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest {
             for (int i = 0; i < 5; i++) {
                 var nodeId = new RaftNodeId(new 
TestReplicationGroupId("test_raft_group_" + i), localNode);
 
-                srv.startRaftNode(nodeId, initialConf, listenerFactory.get(), 
defaults());
+                srv.startRaftNode(nodeId, initialConf, listenerFactory.get(), 
defaults().commandsMarshaller(commandsMarshaller));
             }
         });
 
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index 25770b4f4f..1520eb057b 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -39,9 +39,11 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -115,11 +117,16 @@ class ItSimpleCounterServerTest extends 
RaftServerAbstractTest {
 
         Peer serverPeer = memberConfiguration.peer(serverNodeName);
 
+        // Short name for long lines later in code.
+        var cmdMarshaller = new 
ThreadLocalOptimizedMarshaller(service.serializationRegistry());
+
+        RaftGroupOptions grpOptions = 
defaults().commandsMarshaller(cmdMarshaller);
+
         assertTrue(
-                server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_0, 
serverPeer), memberConfiguration, new CounterListener(), defaults())
+                server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_0, 
serverPeer), memberConfiguration, new CounterListener(), grpOptions)
         );
         assertTrue(
-                server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_1, 
serverPeer), memberConfiguration, new CounterListener(), defaults())
+                server.startRaftNode(new RaftNodeId(COUNTER_GROUP_ID_1, 
serverPeer), memberConfiguration, new CounterListener(), grpOptions)
         );
 
         ClusterService clientNode1 = clusterService(PORT + 1, List.of(addr), 
true);
@@ -127,13 +134,13 @@ class ItSimpleCounterServerTest extends 
RaftServerAbstractTest {
         executor = new ScheduledThreadPoolExecutor(20, new 
NamedThreadFactory(Loza.CLIENT_POOL_NAME, logger()));
 
         client1 = RaftGroupServiceImpl
-                .start(COUNTER_GROUP_ID_0, clientNode1, FACTORY, 
raftConfiguration, memberConfiguration, false, executor)
+                .start(COUNTER_GROUP_ID_0, clientNode1, FACTORY, 
raftConfiguration, memberConfiguration, false, executor, cmdMarshaller)
                 .get(3, TimeUnit.SECONDS);
 
         ClusterService clientNode2 = clusterService(PORT + 2, List.of(addr), 
true);
 
         client2 = RaftGroupServiceImpl
-                .start(COUNTER_GROUP_ID_1, clientNode2, FACTORY, 
raftConfiguration, memberConfiguration, false, executor)
+                .start(COUNTER_GROUP_ID_1, clientNode2, FACTORY, 
raftConfiguration, memberConfiguration, false, executor, cmdMarshaller)
                 .get(3, TimeUnit.SECONDS);
 
         assertTrue(waitForTopology(service, 3, 10_000));
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
index 21207db67f..52bb3d7ca5 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -212,8 +213,10 @@ public abstract class JraftAbstractTest extends 
RaftServerAbstractTest {
 
         ClusterService clientNode = clusterService(CLIENT_PORT + 
clients.size(), List.of(addr), true);
 
+        var commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(clientNode.serializationRegistry());
+
         RaftGroupService client = RaftGroupServiceImpl
-                .start(groupId, clientNode, FACTORY, raftConfiguration, 
configuration, false, executor)
+                .start(groupId, clientNode, FACTORY, raftConfiguration, 
configuration, false, executor, commandsMarshaller)
                 .get(3, TimeUnit.SECONDS);
 
         clients.add(client);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index a232853291..b6a80931c0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.raft;
 
+import static java.util.Objects.requireNonNullElse;
+
 import java.nio.file.Path;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -113,6 +116,7 @@ public class Loza implements RaftManager {
         NodeOptions options = new NodeOptions();
 
         options.setClock(clock);
+        options.setCommandsMarshaller(new 
ThreadLocalOptimizedMarshaller(clusterNetSvc.serializationRegistry()));
 
         this.opts = options;
 
@@ -300,7 +304,9 @@ public class Loza implements RaftManager {
                     configuration,
                     lsnr,
                     eventsLsnr,
-                    
RaftGroupOptions.defaults().ownFsmCallerExecutorDisruptorConfig(disruptorConfiguration),
+                    // Use default marshaller here, because this particular 
method is used in very specific circumstances.
+                    RaftGroupOptions.defaults()
+                            
.ownFsmCallerExecutorDisruptorConfig(disruptorConfiguration),
                     factory
             );
 
@@ -313,6 +319,7 @@ public class Loza implements RaftManager {
         }
     }
 
+    @TestOnly
     @Override
     public CompletableFuture<RaftGroupService> startRaftGroupService(
             ReplicationGroupId groupId,
@@ -323,7 +330,8 @@ public class Loza implements RaftManager {
         }
 
         try {
-            return startRaftGroupServiceInternal(groupId, configuration);
+            // Use default command marshaller here.
+            return startRaftGroupServiceInternal(groupId, configuration, 
opts.getCommandsMarshaller());
         } finally {
             busyLock.leaveBusy();
         }
@@ -333,14 +341,19 @@ public class Loza implements RaftManager {
     public <T extends RaftGroupService> CompletableFuture<T> 
startRaftGroupService(
             ReplicationGroupId groupId,
             PeersAndLearners configuration,
-            RaftServiceFactory<T> factory
+            RaftServiceFactory<T> factory,
+            @Nullable Marshaller commandsMarshaller
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return factory.startRaftGroupService(groupId, configuration, 
raftConfiguration, executor);
+            if (commandsMarshaller == null) {
+                commandsMarshaller = opts.getCommandsMarshaller();
+            }
+
+            return factory.startRaftGroupService(groupId, configuration, 
raftConfiguration, executor, commandsMarshaller);
         } finally {
             busyLock.leaveBusy();
         }
@@ -367,13 +380,17 @@ public class Loza implements RaftManager {
             ));
         }
 
+        Marshaller cmdMarshaller = 
requireNonNullElse(groupOptions.commandsMarshaller(), 
opts.getCommandsMarshaller());
+
         return raftServiceFactory == null
-                ? (CompletableFuture<T>) 
startRaftGroupServiceInternal(nodeId.groupId(), configuration)
-                : raftServiceFactory.startRaftGroupService(nodeId.groupId(), 
configuration, raftConfiguration, executor);
+                ? (CompletableFuture<T>) 
startRaftGroupServiceInternal(nodeId.groupId(), configuration, cmdMarshaller)
+                : raftServiceFactory.startRaftGroupService(nodeId.groupId(), 
configuration, raftConfiguration, executor, cmdMarshaller);
     }
 
     private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal(
-            ReplicationGroupId grpId, PeersAndLearners membersConfiguration
+            ReplicationGroupId grpId,
+            PeersAndLearners membersConfiguration,
+            Marshaller commandsMarshaller
     ) {
         return RaftGroupServiceImpl.start(
                 grpId,
@@ -382,7 +399,8 @@ public class Loza implements RaftManager {
                 raftConfiguration,
                 membersConfiguration,
                 true,
-                executor
+                executor,
+                commandsMarshaller
         );
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 66ca55ccbf..f0f3ec1304 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -111,6 +111,8 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
     /** Executor for scheduling retries of {@link 
RaftGroupServiceImpl#sendWithRetry} invocations. */
     private final ScheduledExecutorService executor;
 
+    private final Marshaller commandsMarshaller;
+
     /** Busy lock. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -124,6 +126,7 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
      * @param membersConfiguration Raft members configuration.
      * @param leader Group leader.
      * @param executor Executor for retrying requests.
+     * @param commandsMarshaller Marshaller that should be used to 
serialize/deserialize commands.
      */
     private RaftGroupServiceImpl(
             ReplicationGroupId groupId,
@@ -132,7 +135,8 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             RaftConfiguration configuration,
             PeersAndLearners membersConfiguration,
             @Nullable Peer leader,
-            ScheduledExecutorService executor
+            ScheduledExecutorService executor,
+            Marshaller commandsMarshaller
     ) {
         this.cluster = cluster;
         this.configuration = configuration;
@@ -143,6 +147,7 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
         this.realGroupId = groupId;
         this.leader = leader;
         this.executor = executor;
+        this.commandsMarshaller = commandsMarshaller;
     }
 
     /**
@@ -164,7 +169,8 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             RaftConfiguration configuration,
             PeersAndLearners membersConfiguration,
             boolean getLeader,
-            ScheduledExecutorService executor
+            ScheduledExecutorService executor,
+            Marshaller commandsMarshaller
     ) {
         var service = new RaftGroupServiceImpl(
                 groupId,
@@ -173,7 +179,8 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
                 configuration,
                 membersConfiguration,
                 null,
-                executor
+                executor,
+                commandsMarshaller
         );
 
         if (!getLeader) {
@@ -453,9 +460,14 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
         Function<Peer, ActionRequest> requestFactory;
 
         if (cmd instanceof WriteCommand) {
+            byte[] commandBytes = commandsMarshaller.marshall(cmd);
+
             requestFactory = targetPeer -> factory.writeActionRequest()
                     .groupId(groupId)
-                    .command((WriteCommand) cmd)
+                    .command(commandBytes)
+                    // Having prepared deserialized command makes its handling 
more efficient in the state machine.
+                    // This saves us from extra-deserialization on a local 
machine, which would take precious time to do.
+                    .deserializedCommand((WriteCommand) cmd)
                     .build();
         } else {
             requestFactory = targetPeer -> factory.readActionRequest()
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index e849a99ffc..2825a7162b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.raft.server;
 
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index b757d2296e..d016dd9069 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -40,6 +40,7 @@ public interface RaftServer extends IgniteComponent {
     /**
      * Returns cluster service.
      */
+    @TestOnly
     ClusterService clusterService();
 
     /**
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 8b2404eb59..ca9393947d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.raft.server.impl;
 
-import static java.util.Objects.requireNonNullElse;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
 
@@ -43,6 +42,7 @@ import java.util.stream.IntStream;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -58,7 +58,6 @@ import 
org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
 import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageFactory;
-import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.network.ClusterService;
@@ -91,7 +90,6 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -136,9 +134,6 @@ public class JraftServerImpl implements RaftServer {
     /** Request executor. */
     private ExecutorService requestExecutor;
 
-    /** Marshaller for RAFT commands that is used if a marshaller is not 
specified in {@link RaftGroupOptions}. */
-    private final Marshaller defaultCommandsMarshaller;
-
     /** Raft service event interceptor. */
     private final RaftServiceEventInterceptor serviceEventInterceptor;
 
@@ -212,7 +207,6 @@ public class JraftServerImpl implements RaftServer {
 
         startGroupInProgressMonitors = Collections.unmodifiableList(monitors);
 
-        defaultCommandsMarshaller = new 
ThreadLocalOptimizedMarshaller(service.serializationRegistry());
         serviceEventInterceptor = new RaftServiceEventInterceptor();
     }
 
@@ -474,10 +468,11 @@ public class JraftServerImpl implements RaftServer {
 
             
nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString());
 
-            Marshaller commandsMarshaller = 
requireNonNullElse(groupOptions.commandsMarshaller(), 
defaultCommandsMarshaller);
-            nodeOptions.setCommandsMarshaller(commandsMarshaller);
+            if (groupOptions.commandsMarshaller() != null) {
+                
nodeOptions.setCommandsMarshaller(groupOptions.commandsMarshaller());
+            }
 
-            nodeOptions.setFsm(new DelegatingStateMachine(lsnr, 
commandsMarshaller));
+            nodeOptions.setFsm(new DelegatingStateMachine(lsnr, 
nodeOptions.getCommandsMarshaller()));
 
             nodeOptions.setRaftGrpEvtsLsnr(new 
RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor, 
evLsnr));
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
index 28121efda3..cc4609830e 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
@@ -24,11 +24,11 @@ import 
org.apache.ignite.internal.network.direct.DirectMessageReader;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
 import 
org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageReader;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageWriter;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 
 /**
  * Marshaller implementation that uses a {@link DirectByteBufferStream} 
variant to serialize/deserialize data.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
index a25664c53d..e929125473 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.raft.util;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 
 /**
  * Thread-safe variant of {@link OptimizedMarshaller}.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 05c424b473..785cd06189 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.raft.JraftGroupEventsListener;
+import org.apache.ignite.internal.raft.Marshaller;
 import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
 import org.apache.ignite.raft.jraft.StateMachine;
@@ -35,14 +36,12 @@ import 
org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
 import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.util.Copiable;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 import org.apache.ignite.raft.jraft.util.NoopTimeoutStrategy;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
 import org.apache.ignite.raft.jraft.util.timer.Timer;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Node options.
@@ -705,7 +704,6 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         this.electionTimeoutStrategy = electionTimeoutStrategy;
     }
 
-    @Nullable
     public Marshaller getCommandsMarshaller() {
         return commandsMarshaller;
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java
index 14ef3852e6..a2dc30eaa4 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/WriteActionRequest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.raft.jraft.rpc;
 
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.network.annotations.Transient;
 import org.apache.ignite.raft.jraft.RaftMessageGroup.RpcActionMessageGroup;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Submit a  write action to a replication group.
@@ -27,7 +29,15 @@ import 
org.apache.ignite.raft.jraft.RaftMessageGroup.RpcActionMessageGroup;
 @Transferable(RpcActionMessageGroup.WRITE_ACTION_REQUEST)
 public interface WriteActionRequest extends ActionRequest {
     /**
-     * Returns an action's command.
+     * @return Serialized action's command. Specific serialization format may 
differ from group to group.
      */
-    WriteCommand command();
+    byte[] command();
+
+    /**
+     * @return Original non-serialized command, if available. {@code null} if 
not. This field is used to avoid {@link #command()}
+     * deserialization in cases, where deserialized instance is already 
available. Typical situation for it is command's creation, where
+     * command is explicitly serialized into {@code byte[]} before building 
the message.
+     */
+    @Transient
+    @Nullable WriteCommand deserializedCommand();
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
index d00bccacd7..e7e7e7e640 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.raft.jraft.rpc.impl;
 
-import org.apache.ignite.raft.jraft.rpc.ActionRequest;import 
org.apache.ignite.raft.jraft.rpc.Message;
-import org.apache.ignite.raft.jraft.rpc.RaftServerService;
-import org.apache.ignite.raft.jraft.rpc.RpcContext;import 
org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
-import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -34,7 +34,8 @@ public interface ActionRequestInterceptor {
      *
      * @param rpcCtx RPC context.
      * @param request Request in question.
+     * @param commandsMarshaller Marshaller that can be used to deserialize 
command from the request, if necessary.
      * @return A message to return to the caller, or {@code null} if standard 
handling should be used.
      */
-    @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request);
+    @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request, 
Marshaller commandsMarshaller);
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index aa9bbcf185..358880a359 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -25,12 +25,15 @@ import java.util.concurrent.Executor;
 import org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
 import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
 import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -48,7 +51,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.apache.ignite.raft.jraft.util.BytesUtil;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 
 /**
  * Process action request.
@@ -73,7 +75,7 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
 
     /** {@inheritDoc} */
     @Override
-    public void handleRequest(RpcContext rpcCtx, ActionRequest request) {
+    public final void handleRequest(RpcContext rpcCtx, ActionRequest request) {
         Node node = rpcCtx.getNodeManager().get(request.groupId(), new 
PeerId(rpcCtx.getLocalConsistentId()));
 
         if (node == null) {
@@ -82,39 +84,86 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
             return;
         }
 
-        JraftServerImpl.DelegatingStateMachine fsm = 
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
+        Marshaller commandsMarshaller = 
node.getOptions().getCommandsMarshaller();
+
+        assert commandsMarshaller != null : "Marshaller for group " + 
request.groupId() + " is not found.";
+
+        handleRequestInternal(rpcCtx, node, request, commandsMarshaller);
+    }
+
+    /**
+     * Internal part of the {@link #handleRequest(RpcContext, ActionRequest)}, 
that contains resolved RAFT node, as well as a commands
+     * marshaller instance. May be conveniently reused in subclasses.
+     */
+    protected void handleRequestInternal(RpcContext rpcCtx, Node node, 
ActionRequest request, Marshaller commandsMarshaller) {
+        DelegatingStateMachine fsm = (DelegatingStateMachine) 
node.getOptions().getFsm();
+        RaftGroupListener listener = fsm.getListener();
 
         if (request instanceof WriteActionRequest) {
+            WriteActionRequest writeRequest = (WriteActionRequest)request;
+
+            WriteCommand command = writeRequest.deserializedCommand();
+
+            if (command == null) {
+                command = 
commandsMarshaller.unmarshall(writeRequest.command());
+            }
+
             if (fsm.getListener() instanceof BeforeApplyHandler) {
                 synchronized (groupIdSyncMonitor(request.groupId())) {
                     try {
-                        callOnBeforeApply(request, fsm);
+                        writeRequest = patchCommandBeforeApply(writeRequest, 
(BeforeApplyHandler) listener, command, commandsMarshaller);
                     } catch (SafeTimeReorderException e) {
                         
rpcCtx.sendResponse(factory.errorResponse().errorCode(RaftError.EREORDER.getNumber()).build());
 
                         return;
                     }
 
-                    applyWrite(node, (WriteActionRequest) request, rpcCtx);
+                    applyWrite(node, writeRequest, command, rpcCtx);
                 }
             } else {
-                applyWrite(node, (WriteActionRequest) request, rpcCtx);
+                applyWrite(node, writeRequest, command, rpcCtx);
             }
         } else {
-            if (fsm.getListener() instanceof BeforeApplyHandler) {
-                callOnBeforeApply(request, fsm);
+            ReadActionRequest readRequest = (ReadActionRequest) request;
+
+            if (listener instanceof BeforeApplyHandler) {
+                ReadCommand command = readRequest.command();
+
+                readRequest = patchCommandBeforeApply(readRequest, 
(BeforeApplyHandler) listener, command, commandsMarshaller);
             }
 
-            applyRead(node, (ReadActionRequest) request, rpcCtx);
+            applyRead(node, readRequest, rpcCtx);
         }
     }
 
-    private static void callOnBeforeApply(ActionRequest request, 
DelegatingStateMachine fsm) {
-        Command command = request instanceof WriteActionRequest
-                ? ((WriteActionRequest) request).command()
-                : ((ReadActionRequest) request).command();
+    /**
+     * This method calls {@link BeforeApplyHandler#onBeforeApply(Command)} and 
returns action request with a serialized version of the
+     * updated command, if it has been updated. Otherwise, the method returns 
the original {@code request} instance. The reason for such
+     * behavior is the fact that we use {@code byte[]} in action requests, 
thus modified command should be serialized twice.
+     */
+    private <AR extends ActionRequest> AR patchCommandBeforeApply(
+            AR request,
+            BeforeApplyHandler beforeApplyHandler,
+            Command command,
+            Marshaller commandsMarshaller
+    ) throws SafeTimeReorderException {
+        if (!beforeApplyHandler.onBeforeApply(command)) {
+            return request;
+        }
 
-        ((BeforeApplyHandler) fsm.getListener()).onBeforeApply(command);
+        if (request instanceof WriteActionRequest) {
+            return (AR) factory.writeActionRequest()
+                .groupId(request.groupId())
+                .command(commandsMarshaller.marshall(command))
+                .deserializedCommand((WriteCommand)command)
+                .build();
+        } else {
+            return (AR) factory.readActionRequest()
+                .groupId(request.groupId())
+                .command((ReadCommand)command)
+                .readOnlySafe(((ReadActionRequest)request).readOnlySafe())
+                .build();
+        }
     }
 
     private Object groupIdSyncMonitor(String groupId) {
@@ -124,17 +173,14 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
     }
 
     /**
-     * @param node    The node.
+     * @param node The node.
      * @param request The request.
-     * @param rpcCtx  The context.
+     * @param command The command.
+     * @param rpcCtx The context.
      */
-    private void applyWrite(Node node, WriteActionRequest request, RpcContext 
rpcCtx) {
-        Marshaller commandsMarshaller = 
node.getOptions().getCommandsMarshaller();
-
-        assert commandsMarshaller != null;
-
-        node.apply(new 
Task(ByteBuffer.wrap(commandsMarshaller.marshall(request.command())),
-                new CommandClosureImpl<>(request.command()) {
+    private void applyWrite(Node node, WriteActionRequest request, Command 
command, RpcContext rpcCtx) {
+        node.apply(new Task(ByteBuffer.wrap(request.command()),
+                new CommandClosureImpl<>(command) {
                     @Override
                     public void result(Serializable res) {
                         if (res instanceof Throwable) {
@@ -156,9 +202,9 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
     }
 
     /**
-     * @param node    The node.
+     * @param node The node.
      * @param request The request.
-     * @param rpcCtx  The context.
+     * @param rpcCtx The context.
      */
     private void applyRead(Node node, ReadActionRequest request, RpcContext 
rpcCtx) {
         if (request.readOnlySafe()) {
@@ -171,7 +217,7 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
                         try {
                             
fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new 
CommandClosure<>() {
                                 @Override public ReadCommand command() {
-                                    return (ReadCommand)request.command();
+                                    return request.command();
                                 }
 
                                 @Override public void result(Serializable res) 
{
@@ -201,7 +247,7 @@ public class ActionRequestProcessor implements 
RpcProcessor<ActionRequest> {
             try {
                 
fsm.getListener().onRead(List.<CommandClosure<ReadCommand>>of(new 
CommandClosure<>() {
                     @Override public ReadCommand command() {
-                        return (ReadCommand)request.command();
+                        return request.command();
                     }
 
                     @Override public void result(Serializable res) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
index dac65dc7be..18fd8901dd 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.raft.jraft.rpc.impl;
 
 import java.util.concurrent.Executor;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.rpc.ActionRequest;
 import org.apache.ignite.raft.jraft.rpc.Message;
@@ -39,13 +41,13 @@ public class InterceptingActionRequestProcessor extends 
ActionRequestProcessor {
     }
 
     @Override
-    public void handleRequest(RpcContext rpcCtx, ActionRequest request) {
-        Message interceptionResult = interceptor.intercept(rpcCtx, request);
+    protected void handleRequestInternal(RpcContext rpcCtx, Node node, 
ActionRequest request, Marshaller commandsMarshaller) {
+        Message interceptionResult = interceptor.intercept(rpcCtx, request, 
commandsMarshaller);
 
         if (interceptionResult != null) {
             rpcCtx.sendResponse(interceptionResult);
         } else {
-            super.handleRequest(rpcCtx, request);
+            super.handleRequestInternal(rpcCtx, node, request, 
commandsMarshaller);
         }
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
index 77c46ede23..686c425cae 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.jraft.rpc.impl;
 
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.raft.jraft.rpc.ActionRequest;import 
org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RpcContext;
 import org.jetbrains.annotations.Nullable;
@@ -27,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class NullActionRequestInterceptor implements ActionRequestInterceptor {
     @Override
-    public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest 
request) {
+    public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest 
request, Marshaller commandsMarshaller) {
         return null;
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
index 1242f4d608..590ce7f851 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
@@ -23,10 +23,9 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.util.Bits;
-import org.apache.ignite.raft.jraft.util.Marshaller;
+import org.apache.ignite.raft.jraft.util.JDKMarshaller;
 import org.apache.ignite.raft.jraft.util.Utils;
 
 /**
@@ -60,7 +59,7 @@ public class MessageFile {
             }
             final byte[] nameBytes = new byte[len];
             readBytes(nameBytes, input);
-            return Marshaller.DEFAULT.unmarshall(ByteBuffer.wrap(nameBytes));
+            return JDKMarshaller.INSTANCE.unmarshall(nameBytes);
         }
     }
 
@@ -83,7 +82,7 @@ public class MessageFile {
         final File file = new File(this.path + ".tmp");
         try (final FileOutputStream fOut = new FileOutputStream(file);
              final BufferedOutputStream output = new 
BufferedOutputStream(fOut)) {
-            byte[] bytes = Marshaller.DEFAULT.marshall(msg);
+            byte[] bytes = JDKMarshaller.INSTANCE.marshall(msg);
 
             final byte[] lenBytes = new byte[4];
             Bits.putInt(lenBytes, 0, bytes.length);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
index c0444379e9..a9d6241361 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
@@ -32,7 +32,7 @@ import 
org.apache.ignite.raft.jraft.entity.LocalStorageOutter.LocalSnapshotPbMet
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.io.MessageFile;
-import org.apache.ignite.raft.jraft.util.Marshaller;
+import org.apache.ignite.raft.jraft.util.JDKMarshaller;
 
 /**
  * Table to keep local snapshot metadata infos.
@@ -71,7 +71,7 @@ public class LocalSnapshotMetaTable {
 
         pbMetaBuilder.filesList(files);
 
-        return 
ByteBuffer.wrap(Marshaller.DEFAULT.marshall(pbMetaBuilder.build()));
+        return 
ByteBuffer.wrap(JDKMarshaller.INSTANCE.marshall(pbMetaBuilder.build()));
     }
 
     /**
@@ -83,7 +83,7 @@ public class LocalSnapshotMetaTable {
             return false;
         }
         try {
-            final LocalSnapshotPbMeta pbMeta = 
Marshaller.DEFAULT.unmarshall(buf);
+            final LocalSnapshotPbMeta pbMeta = 
JDKMarshaller.INSTANCE.unmarshall(buf);
             if (pbMeta == null) {
                 LOG.error("Fail to load meta from buffer.");
                 return false;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
index 9ecf321ef4..f5b46909b1 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
@@ -21,11 +21,15 @@ import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.raft.Marshaller;
 
 /**
- *
+ * {@link Marshaller} implementation, based on standard {@link 
ObjectInputStream} and {@link ObjectOutputStream}.
  */
 public class JDKMarshaller implements Marshaller {
+    /** Pre-allocated {@link JDKMarshaller} instance. */
+    public static final Marshaller INSTANCE = new JDKMarshaller();
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index f7a56f21ed..735b4a7d4f 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -56,6 +56,8 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.util.OptimizedMarshaller;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -66,6 +68,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.raft.TestWriteCommand;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.Status;
@@ -88,6 +91,7 @@ import 
org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -143,6 +147,9 @@ public class RaftGroupServiceTest extends 
BaseIgniteAbstractTest {
         when(cluster.messagingService()).thenReturn(messagingService);
         when(cluster.topologyService()).thenReturn(topologyService);
 
+        MessageSerializationRegistry serializationRegistry = 
ClusterServiceTestUtils.defaultSerializationRegistry();
+        
when(cluster.serializationRegistry()).thenReturn(serializationRegistry);
+
         when(topologyService.getByConsistentId(any()))
                 .thenAnswer(invocation -> {
                     String consistentId = invocation.getArgument(0);
@@ -587,8 +594,11 @@ public class RaftGroupServiceTest extends 
BaseIgniteAbstractTest {
     private RaftGroupService startRaftGroupService(List<Peer> peers, boolean 
getLeader) {
         PeersAndLearners memberConfiguration = 
PeersAndLearners.fromPeers(peers, Set.of());
 
-        CompletableFuture<RaftGroupService> service =
-                RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, 
raftConfiguration, memberConfiguration, getLeader, executor);
+        var commandsSerializer = new 
ThreadLocalOptimizedMarshaller(cluster.serializationRegistry());
+
+        CompletableFuture<RaftGroupService> service = 
RaftGroupServiceImpl.start(
+                TEST_GRP, cluster, FACTORY, raftConfiguration, 
memberConfiguration, getLeader, executor, commandsSerializer
+        );
 
         assertThat(service, willCompleteSuccessfully());
 
@@ -613,12 +623,16 @@ public class RaftGroupServiceTest extends 
BaseIgniteAbstractTest {
      * @param peer Fail the request targeted to given peer.
      */
     private void mockUserInput(boolean delay, @Nullable Peer peer) {
+        //noinspection Convert2Lambda
         when(messagingService.invoke(
                 any(ClusterNode.class),
-                argThat(new ArgumentMatcher<ActionRequest>() {
+                // Must be an anonymous class, to deduce the message type from 
the generic superclass.
+                argThat(new ArgumentMatcher<WriteActionRequest>() {
                     @Override
-                    public boolean matches(ActionRequest arg) {
-                        return arg instanceof WriteActionRequest && 
((WriteActionRequest) arg).command() instanceof TestWriteCommand;
+                    public boolean matches(WriteActionRequest arg) {
+                        Object command = new 
OptimizedMarshaller(cluster.serializationRegistry()).unmarshall(arg.command());
+
+                        return command instanceof TestWriteCommand;
                     }
                 }),
                 anyLong()
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java
index 1c274f1f88..edd6145bb3 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java
@@ -23,7 +23,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.util.AdaptiveBufAllocator;
 import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
 import org.apache.ignite.raft.jraft.util.ByteString;
-import org.apache.ignite.raft.jraft.util.Marshaller;
+import org.apache.ignite.raft.jraft.util.JDKMarshaller;
 import org.apache.ignite.raft.jraft.util.RecyclableByteBufferList;
 import org.apache.ignite.raft.jraft.util.RecycleUtil;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -171,7 +171,7 @@ public class AppendEntriesBenchmark {
         final ByteBuffer buf = dataBuffer.getBuffer();
         buf.flip();
         rb.data(new ByteString(buf));
-        return Marshaller.DEFAULT.marshall(rb.build());
+        return JDKMarshaller.INSTANCE.marshall(rb.build());
     }
 
     private byte[] sendEntries2() {
@@ -188,7 +188,7 @@ public class AppendEntriesBenchmark {
             final ByteBuffer buf = dataBuffer.getBuffer();
             buf.flip();
             rb.data(new ByteString(buf));
-            return Marshaller.DEFAULT.marshall(rb.build());
+            return JDKMarshaller.INSTANCE.marshall(rb.build());
         }
         finally {
             RecycleUtil.recycle(dataBuffer);
@@ -211,7 +211,7 @@ public class AppendEntriesBenchmark {
             final int remaining = buf.remaining();
             handleThreadLocal.get().record(remaining);
             rb.data(new ByteString(buf));
-            return Marshaller.DEFAULT.marshall(rb.build());
+            return JDKMarshaller.INSTANCE.marshall(rb.build());
         }
         finally {
             RecycleUtil.recycle(dataBuffer);
@@ -231,7 +231,7 @@ public class AppendEntriesBenchmark {
                 dataBuffer.add(buf.slice());
             }
             rb.data(RecyclableByteBufferList.concatenate(dataBuffer));
-            return Marshaller.DEFAULT.marshall(rb.build());
+            return JDKMarshaller.INSTANCE.marshall(rb.build());
         }
         finally {
             RecycleUtil.recycle(dataBuffer);
diff --git 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index ec6ce84fbb..bfdf5fb537 100644
--- 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++ 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
 import org.apache.ignite.internal.raft.RaftNodeId;
@@ -433,7 +434,7 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
                 new RaftNodeId(raftGroupId(), 
initialMemberConf.peer(service.topologyService().localMember().name())),
                 initialMemberConf,
                 createListener(service, listenerPersistencePath, idx),
-                defaults()
+                defaults().commandsMarshaller(commandsMarshaller(service))
         );
 
         return server;
@@ -454,14 +455,7 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
         return startClient(testInfo, raftGroupId(), new 
NetworkAddress(getLocalAddress(), PORT));
     }
 
-    /**
-     * Returns a client service.
-     *
-     * @return The client service.
-     */
-    protected ClusterService clientService() {
-        return cluster.get(initialMemberConf.peers().size());
-    }
+    protected abstract Marshaller commandsMarshaller(ClusterService 
clusterService);
 
     /**
      * Starts a client with a specific address.
@@ -471,8 +465,10 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
     private RaftGroupService startClient(TestInfo testInfo, 
TestReplicationGroupId groupId, NetworkAddress addr) {
         ClusterService clientNode = clusterService(testInfo, CLIENT_PORT + 
clients.size(), addr);
 
+        Marshaller commandsMarshaller = commandsMarshaller(clientNode);
+
         CompletableFuture<RaftGroupService> clientFuture = RaftGroupServiceImpl
-                .start(groupId, clientNode, FACTORY, raftConfiguration, 
initialMemberConf, true, executor);
+                .start(groupId, clientNode, FACTORY, raftConfiguration, 
initialMemberConf, true, executor, commandsMarshaller);
 
         assertThat(clientFuture, willCompleteSuccessfully());
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 96fc615916..993a4da6ab 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.LeaderElectionListener;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
@@ -165,6 +166,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * @param logicalTopologyService Logical topology service.
      * @param notifyOnSubscription Whether to notify callback after 
subscription to pass the current leader and term into it, even
      *         if the leader did not change in that moment (see {@link 
#subscribeLeader}).
+     * @param cmdMarshaller Marshaller that should be used to 
serialize/deserialize commands.
      * @return Future to create a raft client.
      */
     public static CompletableFuture<TopologyAwareRaftGroupService> start(
@@ -177,9 +179,10 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
             ScheduledExecutorService executor,
             LogicalTopologyService logicalTopologyService,
             RaftGroupEventsClientListener eventsClientListener,
-            boolean notifyOnSubscription
+            boolean notifyOnSubscription,
+            Marshaller cmdMarshaller
     ) {
-        return RaftGroupServiceImpl.start(groupId, cluster, factory, 
raftConfiguration, configuration, getLeader, executor)
+        return RaftGroupServiceImpl.start(groupId, cluster, factory, 
raftConfiguration, configuration, getLeader, executor, cmdMarshaller)
                 .thenApply(raftGroupService -> new 
TopologyAwareRaftGroupService(cluster, factory, executor, raftConfiguration,
                         raftGroupService, logicalTopologyService, 
eventsClientListener, notifyOnSubscription));
     }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
index 959f21f8ea..304d018188 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.client;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -66,7 +67,8 @@ public class TopologyAwareRaftGroupServiceFactory implements 
RaftServiceFactory<
             ReplicationGroupId groupId,
             PeersAndLearners peersAndLearners,
             RaftConfiguration raftConfiguration,
-            ScheduledExecutorService raftClientExecutor
+            ScheduledExecutorService raftClientExecutor,
+            Marshaller commandsMarshaller
     ) {
         return TopologyAwareRaftGroupService.start(
                 groupId,
@@ -78,7 +80,8 @@ public class TopologyAwareRaftGroupServiceFactory implements 
RaftServiceFactory<
                 raftClientExecutor,
                 logicalTopologyService,
                 eventsClientListener,
-                true
+                true,
+                commandsMarshaller
         );
     }
 }
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
index 2fe2548f28..dedfe854c8 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.raft.TestRaftGroupListener;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -348,10 +349,15 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
 
                 var dataPath = workDir.resolve("raft_" + 
localPeer.consistentId());
 
+                var commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(cluster.serializationRegistry());
+
+                NodeOptions nodeOptions = new NodeOptions();
+                nodeOptions.setCommandsMarshaller(commandsMarshaller);
+
                 var raftServer = new JraftServerImpl(
                         cluster,
                         dataPath,
-                        new NodeOptions(),
+                        nodeOptions,
                         eventsClientListener
                 );
                 raftServer.start();
@@ -360,7 +366,7 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
                         new RaftNodeId(GROUP_ID, localPeer),
                         peersAndLearners,
                         new TestRaftGroupListener(),
-                        RaftGroupOptions.defaults()
+                        
RaftGroupOptions.defaults().commandsMarshaller(commandsMarshaller)
                 );
 
                 raftServers.put(addr, raftServer);
@@ -398,6 +404,8 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
             });
         }
 
+        var commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(localClusterService.serializationRegistry());
+
         return TopologyAwareRaftGroupService.start(
                 GROUP_ID,
                 localClusterService,
@@ -408,7 +416,8 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
                 executor,
                 new LogicalTopologyServiceTestImpl(localClusterService),
                 eventsClientListener,
-                notifyOnSubscription
+                notifyOnSubscription,
+                commandsMarshaller
         ).join();
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
index 21bfa15b83..e3366fc3eb 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
@@ -47,7 +47,9 @@ import 
org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import 
org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -236,13 +238,17 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
      *         the command was sent.
      * @param dropBuildIndexCommand {@code True} to drop {@link 
BuildIndexCommand}.
      */
-    private static BiPredicate<String, NetworkMessage> 
waitSendBuildIndexCommand(
+    private BiPredicate<String, NetworkMessage> waitSendBuildIndexCommand(
             CompletableFuture<Integer> sendBuildIndexCommandFuture,
             boolean dropBuildIndexCommand
     ) {
+        IgniteImpl node = CLUSTER.node(0);
+        MessageSerializationRegistry serializationRegistry = 
node.raftManager().service().serializationRegistry();
+        var commandsMarshaller = new 
PartitionCommandsMarshallerImpl(serializationRegistry);
+
         return (nodeConsistentId, networkMessage) -> {
             if (networkMessage instanceof WriteActionRequest) {
-                Command command = ((WriteActionRequest) 
networkMessage).command();
+                Command command = 
commandsMarshaller.unmarshall(((WriteActionRequest) networkMessage).command());
 
                 if (command instanceof BuildIndexCommand) {
                     sendBuildIndexCommandFuture.complete(((BuildIndexCommand) 
command).indexId());
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 4af4baa239..189e0952ec 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -67,6 +67,7 @@ dependencies {
     testImplementation(testFixtures(project(':ignite-storage-api')))
     testImplementation(testFixtures(project(':ignite-metastorage')))
     testImplementation(testFixtures(project(':ignite-marshaller-common')))
+    testImplementation(testFixtures(project(':ignite-network')))
     testImplementation(testFixtures(project(':ignite-placement-driver-api')))
     testImplementation(testFixtures(project(':ignite-distribution-zones')))
     testImplementation(testFixtures(project(':ignite-catalog')))
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 3db0aa2e3d..c2f066a838 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest;
@@ -91,6 +92,7 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWrit
 import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -523,6 +525,11 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
         return new TestReplicationGroupId("partitions");
     }
 
+    @Override
+    protected Marshaller commandsMarshaller(ClusterService clusterService) {
+        return new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
+    }
+
     /**
      * Creates a {@link Row} with the supplied key and value.
      *
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index c9ed4e4b64..ed6954c01c 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -49,6 +49,7 @@ import java.util.stream.IntStream;
 import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.cluster.management.CmgGroupId;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -60,12 +61,14 @@ import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
+import 
org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl;
 import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.log4j2.LogInspector;
 import org.apache.ignite.internal.testframework.log4j2.LogInspector.Handler;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.Status;
@@ -556,9 +559,12 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
 
         try {
             prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, 
theCluster -> {
+                IgniteImpl node = theCluster.node(0);
+                MessageSerializationRegistry serializationRegistry = 
node.raftManager().service().serializationRegistry();
+
                 BiPredicate<String, NetworkMessage> 
dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed = (recipientId, 
message) ->
                         message instanceof WriteActionRequest
-                                && ((WriteActionRequest) message).command() 
instanceof SafeTimeSyncCommand
+                                && isSafeTimeSyncCommand((WriteActionRequest) 
message, serializationRegistry)
                                 && 
!snapshotInstallFailedDueToIdenticalRetry.get();
 
                 theCluster.node(0).dropMessages(
@@ -576,6 +582,17 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
         }
     }
 
+    private static boolean isSafeTimeSyncCommand(WriteActionRequest request, 
MessageSerializationRegistry serializationRegistry) {
+        String groupId = request.groupId();
+
+        if (groupId.equals(MetastorageGroupId.INSTANCE.toString()) || 
groupId.equals(CmgGroupId.INSTANCE.toString())) {
+            return false;
+        }
+
+        var commandsMarshaller = new 
PartitionCommandsMarshallerImpl(serializationRegistry);
+        return commandsMarshaller.unmarshall(request.command()) instanceof 
SafeTimeSyncCommand;
+    }
+
     @Test
     void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception {
         CompletableFuture<Void> sentSnapshotMetaResponseFormNode1Future = new 
CompletableFuture<>();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4fb2bd45ca..19d2e1c3f9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -107,6 +107,7 @@ import 
org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -175,7 +176,6 @@ import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 import org.apache.ignite.table.Table;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -756,7 +756,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
                             try {
                                 //TODO IGNITE-19614 This procedure takes 10 
seconds if there's no majority online.
-                                return 
raftMgr.startRaftGroupService(replicaGrpId, newConfiguration, 
raftGroupServiceFactory);
+                                return raftMgr.startRaftGroupService(
+                                        replicaGrpId, newConfiguration, 
raftGroupServiceFactory, raftCommandsMarshaller
+                                );
                             } catch (NodeStoppingException ex) {
                                 return failedFuture(ex);
                             }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java
index f7f4490344..a115b59449 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CatalogVersionAware.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
+import 
org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshaller;
+
 /**
  * A command that requires certain level of catalog version to be locally 
available just to be accepted on the node.
  */
@@ -25,4 +27,9 @@ public interface CatalogVersionAware {
      * Returns version that the Catalog must have locally for the node to be 
allowed to accept this command via replication.
      */
     int requiredCatalogVersion();
+
+    /**
+     * Setter for {@link #requiredCatalogVersion()}. Called by the creator or 
the {@link PartitionCommandsMarshaller} while deserializing.
+     */
+    void requiredCatalogVersion(int version);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
index cb56b85d8f..a69d8fa731 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.table.distributed.command;
 
 import java.util.UUID;
 import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
+import org.apache.ignite.network.annotations.Transient;
+import org.apache.ignite.network.annotations.WithSetter;
 
 /**
  * Partition transactional command.
@@ -34,9 +36,13 @@ public interface PartitionCommand extends 
SafeTimePropagatingCommand, CatalogVer
      */
     boolean full();
 
-    /**
-     * Returns version that the Catalog must have locally for the node to be 
allowed to accept this command via replication.
-     */
     @Override
+    @Transient
+    @WithSetter
     int requiredCatalogVersion();
+
+    @Override
+    default void requiredCatalogVersion(int version) {
+        // No-op.
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 27b425273f..5c1710fa1e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -477,7 +477,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
     }
 
     @Override
-    public void onBeforeApply(Command command) {
+    public boolean onBeforeApply(Command command) {
         // This method is synchronized by replication group specific monitor, 
see ActionRequestProcessor#handleRequest.
         if (command instanceof SafeTimePropagatingCommand) {
             SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) 
command;
@@ -489,6 +489,8 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                 throw new SafeTimeReorderException();
             }
         }
+
+        return false;
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
index 47b90121bb..499ae26edd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
@@ -19,12 +19,12 @@ package org.apache.ignite.internal.table.distributed.schema;
 
 import static 
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
 
+import java.nio.ByteBuffer;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Loza;
-import 
org.apache.ignite.internal.table.distributed.command.CatalogVersionAware;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
@@ -53,7 +53,7 @@ public class CheckCatalogVersionOnActionRequest implements 
ActionRequestIntercep
     }
 
     @Override
-    public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest 
request) {
+    public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest 
request, Marshaller commandsMarshaller) {
         Node node = rpcCtx.getNodeManager().get(request.groupId(), new 
PeerId(rpcCtx.getLocalConsistentId()));
 
         if (node == null) {
@@ -65,15 +65,21 @@ public class CheckCatalogVersionOnActionRequest implements 
ActionRequestIntercep
             return errorIfNotLeader;
         }
 
+        if (!(commandsMarshaller instanceof PartitionCommandsMarshaller)) {
+            return null;
+        }
+
         if (!(request instanceof WriteActionRequest)) {
             return null;
         }
 
-        Command command = ((WriteActionRequest) request).command();
+        byte[] command = ((WriteActionRequest) request).command();
+
+        var partitionCommandsMarshaller = (PartitionCommandsMarshaller) 
commandsMarshaller;
 
-        if (command instanceof CatalogVersionAware) {
-            int requiredCatalogVersion = ((CatalogVersionAware) 
command).requiredCatalogVersion();
+        int requiredCatalogVersion = 
partitionCommandsMarshaller.readRequiredCatalogVersion(ByteBuffer.wrap(command));
 
+        if (requiredCatalogVersion >= 0) {
             if (!isMetadataAvailableFor(requiredCatalogVersion, 
catalogService)) {
                 // TODO: IGNITE-20298 - throttle logging.
                 LOG.warn(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
index 53270535b6..519b833c8d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.table.distributed.schema;
 
 import static 
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+import static 
org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshaller.NO_VERSION_REQUIRED;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
 import org.apache.ignite.raft.jraft.entity.RaftOutter;
@@ -34,7 +36,6 @@ import org.apache.ignite.raft.jraft.rpc.RaftServerService;
 import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
-import org.apache.ignite.raft.jraft.util.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -44,8 +45,6 @@ import org.jetbrains.annotations.Nullable;
 public class CheckCatalogVersionOnAppendEntries implements 
AppendEntriesRequestInterceptor {
     private static final IgniteLogger LOG = 
Loggers.forClass(CheckCatalogVersionOnAppendEntries.class);
 
-    private static final int NO_VERSION_REQUIREMENT = Integer.MIN_VALUE;
-
     private final CatalogService catalogService;
 
     public CheckCatalogVersionOnAppendEntries(CatalogService catalogService) {
@@ -66,7 +65,7 @@ public class CheckCatalogVersionOnAppendEntries implements 
AppendEntriesRequestI
         for (RaftOutter.EntryMeta entry : request.entriesList()) {
             int requiredCatalogVersion = 
readRequiredCatalogVersionForMeta(allData, entry, 
node.getOptions().getCommandsMarshaller());
 
-            if (requiredCatalogVersion != NO_VERSION_REQUIREMENT && 
!isMetadataAvailableFor(requiredCatalogVersion, catalogService)) {
+            if (requiredCatalogVersion != NO_VERSION_REQUIRED && 
!isMetadataAvailableFor(requiredCatalogVersion, catalogService)) {
                 // TODO: IGNITE-20298 - throttle logging.
                 LOG.warn(
                         "Metadata not yet available, rejecting 
AppendEntriesRequest with EBUSY [group={}, requiredLevel={}].",
@@ -91,11 +90,11 @@ public class CheckCatalogVersionOnAppendEntries implements 
AppendEntriesRequestI
 
     private static int readRequiredCatalogVersionForMeta(ByteBuffer allData, 
final EntryMeta entry, Marshaller commandsMarshaller) {
         if (entry.type() != EntryType.ENTRY_TYPE_DATA) {
-            return NO_VERSION_REQUIREMENT;
+            return NO_VERSION_REQUIRED;
         }
 
         if (!(commandsMarshaller instanceof PartitionCommandsMarshaller)) {
-            return NO_VERSION_REQUIREMENT;
+            return NO_VERSION_REQUIRED;
         }
 
         PartitionCommandsMarshaller partitionCommandsMarshaller = 
(PartitionCommandsMarshaller) commandsMarshaller;
@@ -105,6 +104,6 @@ public class CheckCatalogVersionOnAppendEntries implements 
AppendEntriesRequestI
             return 
partitionCommandsMarshaller.readRequiredCatalogVersion(allData);
         }
 
-        return NO_VERSION_REQUIREMENT;
+        return NO_VERSION_REQUIRED;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java
index 19079a3528..7a00f2ac7a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java
@@ -18,18 +18,23 @@
 package org.apache.ignite.internal.table.distributed.schema;
 
 import java.nio.ByteBuffer;
-import org.apache.ignite.raft.jraft.util.Marshaller;
+import org.apache.ignite.internal.raft.Marshaller;
 
 /**
  * {@link Marshaller} that first writes some metadata about an object and then 
it writes the actual serialized
  * representation of the object.
  */
 public interface PartitionCommandsMarshaller extends Marshaller {
+    /**
+     * Used instead of a required catalog version when there is no requirement.
+     */
+    int NO_VERSION_REQUIRED = -1;
+
     /**
      * Reads required catalog version from the provided buffer.
      *
      * @param raw Buffer to read from.
-     * @return Catalog version.
+     * @return Catalog version. {@value #NO_VERSION_REQUIRED} if version is 
not required for the given command.
      */
     int readRequiredCatalogVersion(ByteBuffer raw);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
index faeae373e0..2e3dd47947 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
@@ -27,12 +27,6 @@ import 
org.apache.ignite.network.serialization.MessageSerializationRegistry;
  * Default {@link PartitionCommandsMarshaller} implementation.
  */
 public class PartitionCommandsMarshallerImpl extends OptimizedMarshaller 
implements PartitionCommandsMarshaller {
-    /**
-     * Sent instead of a real required catalog version when there is no 
requirement.
-     * -1 so that it is encoded as 1 byte by {@link VarIntUtils}.
-     */
-    private static final int NO_VERSION_REQUIREMENT = -1;
-
     public PartitionCommandsMarshallerImpl(MessageSerializationRegistry 
serializationRegistry) {
         super(serializationRegistry);
     }
@@ -41,7 +35,7 @@ public class PartitionCommandsMarshallerImpl extends 
OptimizedMarshaller impleme
     public byte[] marshall(Object o) {
         int requiredCatalogVersion = o instanceof CatalogVersionAware
                 ? ((CatalogVersionAware) o).requiredCatalogVersion()
-                : NO_VERSION_REQUIREMENT;
+                : NO_VERSION_REQUIRED;
 
         stream.setBuffer(buffer);
         stream.writeInt(requiredCatalogVersion);
@@ -51,13 +45,15 @@ public class PartitionCommandsMarshallerImpl extends 
OptimizedMarshaller impleme
 
     @Override
     public <T> T unmarshall(ByteBuffer raw) {
-        skipRequiredCatalogVersion(raw);
+        int requiredCatalogVersion = readRequiredCatalogVersion(raw);
 
-        return super.unmarshall(raw);
-    }
+        T res = super.unmarshall(raw);
+
+        if (res instanceof CatalogVersionAware) {
+            ((CatalogVersionAware) 
res).requiredCatalogVersion(requiredCatalogVersion);
+        }
 
-    private void skipRequiredCatalogVersion(ByteBuffer raw) {
-        readRequiredCatalogVersion(raw);
+        return res;
     }
 
     /**
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 6f1cefff12..f6046b0163 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -279,7 +279,8 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     @Test
     public void testPreconfiguredTable() throws Exception {
-        when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> 
completedFuture(mock(TopologyAwareRaftGroupService.class)));
+        when(rm.startRaftGroupService(any(), any(), any(), any()))
+                .thenAnswer(mock -> 
completedFuture(mock(TopologyAwareRaftGroupService.class)));
 
         TableManager tableManager = createTableManager(tblManagerFut);
 
@@ -449,7 +450,7 @@ public class TableManagerTest extends IgniteAbstractTest {
     private IgniteBiTuple<TableViewInternal, TableManager> 
startTableManagerStopTest() throws Exception {
         TableViewInternal table = 
mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, tblManagerFut);
 
-        verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), 
any());
+        verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), 
any(), any());
 
         TableManager tableManager = tblManagerFut.join();
 
@@ -527,7 +528,8 @@ public class TableManagerTest extends IgniteAbstractTest {
      *         partition storage is emulated instead.
      */
     private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean 
isTxStorageUnderRebalance) throws NodeStoppingException {
-        when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> 
completedFuture(mock(TopologyAwareRaftGroupService.class)));
+        when(rm.startRaftGroupService(any(), any(), any(), any()))
+                .thenAnswer(mock -> 
completedFuture(mock(TopologyAwareRaftGroupService.class)));
         when(rm.raftNodeReadyFuture(any())).thenReturn(completedFuture(1L));
 
         createZone(1, 1);
@@ -609,7 +611,7 @@ public class TableManagerTest extends IgniteAbstractTest {
     ) throws Exception {
         String consistentId = "node0";
 
-        when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> 
{
+        when(rm.startRaftGroupService(any(), any(), any(), 
any())).thenAnswer(mock -> {
             RaftGroupService raftGrpSrvcMock = 
mock(TopologyAwareRaftGroupService.class);
 
             when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
index aa7953c418..b6c3c8e073 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.schema;
 
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -83,6 +84,8 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
 
     private final PeerId leaderId = new PeerId("leader");
 
+    private PartitionCommandsMarshallerImpl commandsMarshaller;
+
     @BeforeEach
     void initMocks() {
         when(rpcContext.getNodeManager()).thenReturn(nodeManager);
@@ -91,16 +94,18 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
         lenient().when(node.getRaftOptions()).thenReturn(raftOptions);
         lenient().when(node.getNodeState()).thenReturn(State.STATE_LEADER);
         lenient().when(node.getLeaderId()).thenReturn(leaderId);
+
+        commandsMarshaller = new 
PartitionCommandsMarshallerImpl(defaultSerializationRegistry());
     }
 
     @Test
     void delegatesWhenCommandHasNoRequiredCatalogVersion() {
         ActionRequest request = raftMessagesFactory.writeActionRequest()
                 .groupId("test")
-                .command(commandWithoutRequiredCatalogVersion())
+                
.command(commandsMarshaller.marshall(commandWithoutRequiredCatalogVersion()))
                 .build();
 
-        assertThat(interceptor.intercept(rpcContext, request), 
is(nullValue()));
+        assertThat(interceptor.intercept(rpcContext, request, 
commandsMarshaller), is(nullValue()));
     }
 
     private WriteCommand commandWithoutRequiredCatalogVersion() {
@@ -113,10 +118,10 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
 
         ActionRequest request = raftMessagesFactory.writeActionRequest()
                 .groupId("test")
-                .command(commandWithRequiredCatalogVersion(3))
+                
.command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(3)))
                 .build();
 
-        assertThat(interceptor.intercept(rpcContext, request), 
is(nullValue()));
+        assertThat(interceptor.intercept(rpcContext, request, 
commandsMarshaller), is(nullValue()));
     }
 
     private WriteCommand commandWithRequiredCatalogVersion(int 
requiredVersion) {
@@ -135,10 +140,10 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
 
         ActionRequest request = raftMessagesFactory.writeActionRequest()
                 .groupId("test")
-                .command(commandWithRequiredCatalogVersion(6))
+                
.command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(6)))
                 .build();
 
-        Message result = interceptor.intercept(rpcContext, request);
+        Message result = interceptor.intercept(rpcContext, request, 
commandsMarshaller);
 
         assertThat(result, is(notNullValue()));
         assertThat(result, instanceOf(ErrorResponse.class));
@@ -156,10 +161,10 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
 
         ActionRequest request = raftMessagesFactory.writeActionRequest()
                 .groupId("test")
-                .command(commandWithRequiredCatalogVersion(6))
+                
.command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(6)))
                 .build();
 
-        Message result = interceptor.intercept(rpcContext, request);
+        Message result = interceptor.intercept(rpcContext, request, 
commandsMarshaller);
 
         assertThat(result, is(notNullValue()));
         assertThat(result, instanceOf(ErrorResponse.class));
@@ -182,10 +187,10 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
 
         ActionRequest request = raftMessagesFactory.writeActionRequest()
                 .groupId("test")
-                .command(commandWithRequiredCatalogVersion(6))
+                
.command(commandsMarshaller.marshall(commandWithRequiredCatalogVersion(6)))
                 .build();
 
-        Message result = interceptor.intercept(rpcContext, request);
+        Message result = interceptor.intercept(rpcContext, request, 
commandsMarshaller);
 
         assertThat(result, is(notNullValue()));
         assertThat(result, instanceOf(ErrorResponse.class));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
index 53ca61dcb9..5f31c1a1f9 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
@@ -108,7 +108,7 @@ class PartitionCommandsMarshallerImplTest {
 
         byte[] serialized = partitionCommandsMarshaller.marshall(message);
 
-        FinishTxCommand unmarshalled = 
partitionCommandsMarshaller.unmarshall(ByteBuffer.wrap(serialized));
+        FinishTxCommand unmarshalled = 
partitionCommandsMarshaller.unmarshall(serialized);
 
         assertThat(unmarshalled.requiredCatalogVersion(), is(42));
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 84ef75e717..cb1f5b3334 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -21,7 +21,6 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
-import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -91,7 +90,6 @@ import 
org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
-import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
@@ -112,6 +110,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateR
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -441,6 +440,9 @@ public class ItTxTestCluster {
 
         int globalIndexId = 1;
 
+        ThreadLocalPartitionCommandsMarshaller commandsMarshaller =
+                new 
ThreadLocalPartitionCommandsMarshaller(cluster.get(0).serializationRegistry());
+
         for (int p = 0; p < assignments.size(); p++) {
             Set<String> partAssignments = assignments.get(p);
 
@@ -449,7 +451,6 @@ public class ItTxTestCluster {
             for (String assignment : partAssignments) {
                 int partId = p;
 
-                var mvTableStorage = new TestMvTableStorage(tableId, 
DEFAULT_PARTITION_COUNT);
                 var mvPartStorage = new TestMvPartitionStorage(partId);
                 var txStateStorage = txStateStorages.get(assignment);
                 var transactionStateResolver = new TransactionStateResolver(
@@ -575,7 +576,7 @@ public class ItTxTestCluster {
 
             if (startClient) {
                 RaftGroupService service = RaftGroupServiceImpl
-                        .start(grpId, client, FACTORY, raftConfig, 
membersConf, true, executor)
+                        .start(grpId, client, FACTORY, raftConfig, 
membersConf, true, executor, commandsMarshaller)
                         .get(5, TimeUnit.SECONDS);
 
                 clients.put(p, service);
@@ -584,7 +585,7 @@ public class ItTxTestCluster {
                 ClusterService tmpSvc = cluster.get(0);
 
                 RaftGroupService service = RaftGroupServiceImpl
-                        .start(grpId, tmpSvc, FACTORY, raftConfig, 
membersConf, true, executor)
+                        .start(grpId, tmpSvc, FACTORY, raftConfig, 
membersConf, true, executor, commandsMarshaller)
                         .get(5, TimeUnit.SECONDS);
 
                 Peer leader = service.leader();
@@ -597,7 +598,7 @@ public class ItTxTestCluster {
                         .orElseThrow();
 
                 RaftGroupService leaderClusterSvc = RaftGroupServiceImpl
-                        .start(grpId, leaderSrv, FACTORY, raftConfig, 
membersConf, true, executor)
+                        .start(grpId, leaderSrv, FACTORY, raftConfig, 
membersConf, true, executor, commandsMarshaller)
                         .get(5, TimeUnit.SECONDS);
 
                 clients.put(p, leaderClusterSvc);


Reply via email to