This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c3ce15a0d IGNITE-22233 Add zone replica listener (#3931)
2c3ce15a0d is described below

commit 2c3ce15a0d99087c220e386b5e496b5dff9f755f
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Jul 23 16:04:30 2024 +0300

    IGNITE-22233 Add zone replica listener (#3931)
---
 .../replicator/ItReplicaLifecycleTest.java         |  94 ++++++--
 .../PartitionReplicaLifecycleManager.java          |  36 ++-
 .../replicator/ZonePartitionRaftListener.java      |  48 +++-
 .../replicator/ZonePartitionReplicaListener.java   |  94 +++++++-
 .../ReadOnlyDirectMultiRowReplicaRequest.java      |   3 +-
 .../ReadOnlyDirectSingleRowReplicaRequest.java     |   3 +-
 .../ReadOnlyMultiRowPkReplicaRequest.java          |   3 +-
 .../ReadOnlyScanRetrieveBatchReplicaRequest.java   |   3 +-
 .../ReadOnlySingleRowPkReplicaRequest.java         |   3 +-
 .../ReadWriteMultiRowPkReplicaRequest.java         |   3 +-
 .../ReadWriteMultiRowReplicaRequest.java           |   3 +-
 .../ReadWriteScanRetrieveBatchReplicaRequest.java  |   3 +-
 .../ReadWriteSingleRowPkReplicaRequest.java        |   3 +-
 .../ReadWriteSingleRowReplicaRequest.java          |   3 +-
 .../ReadWriteSwapRowReplicaRequest.java            |   3 +-
 .../ItPlacementDriverReplicaSideTest.java          |   2 +-
 .../apache/ignite/internal/replicator/Replica.java |   9 +
 .../ignite/internal/replicator/ReplicaImpl.java    |   5 +
 .../ignite/internal/replicator/ReplicaManager.java |  20 +-
 .../replicator/ZonePartitionReplicaImpl.java       |   8 +-
 .../internal/replicator/message/TableAware.java}   |  13 +-
 .../internal/replicator/ReplicaManagerTest.java    |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  14 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   6 +-
 .../sql/engine/exec/UpdatableTableImpl.java        |   3 +
 .../ignite/distributed/ReplicaUnavailableTest.java |  13 +-
 .../rebalance/ItRebalanceDistributedTest.java      |  16 +-
 .../internal/table/distributed/TableManager.java   | 250 +++++++++++++++++----
 .../distributed/storage/InternalTableImpl.java     |  19 ++
 .../distributed/TableManagerRecoveryTest.java      |  15 +-
 .../table/distributed/TableManagerTest.java        |  10 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   4 +
 .../replication/PartitionReplicaListenerTest.java  |  35 +++
 .../ignite/internal/tx/impl/TxMessageSender.java   |   6 +
 .../tx/message/TxFinishReplicaRequest.java         |   7 +
 35 files changed, 634 insertions(+), 128 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 6b683ea4f0..3524d04942 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -63,7 +63,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.LongFunction;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
@@ -137,6 +139,7 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
@@ -164,17 +167,21 @@ import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.tx.impl.PublicApiThreadingIgniteTransactions;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -338,7 +345,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    public void testEmptyReplicaListener(TestInfo testInfo) throws Exception {
+    public void testZoneReplicaListener(TestInfo testInfo) throws Exception {
         startNodes(testInfo, 3);
 
         Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
@@ -351,17 +358,54 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         createZone(node, "test_zone", 1, 1);
         int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, 
"test_zone", node.hybridClock.nowLong());
 
-        createTable(node, "test_zone", "test_table");
-        int tableId = TableTestUtils.getTableId(node.catalogManager, 
"test_table", node.hybridClock.nowLong());
+        long key = 1;
 
-        node.converter.put(new TablePartitionId(tableId, 0), new 
ZonePartitionId(zoneId, 0));
+        {
+            createTable(node, "test_zone", "test_table");
+            int tableId = TableTestUtils.getTableId(node.catalogManager, 
"test_table", node.hybridClock.nowLong());
 
-        KeyValueView<Long, Integer> keyValueView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+            prepareTableIdToZoneIdConverter(
+                    node,
+                    new TablePartitionId(tableId, 0),
+                    new ZonePartitionId(zoneId, 0)
+            );
+
+            KeyValueView<Long, Integer> keyValueView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+            int val = 100;
+
+            node.transactions().runInTransaction(tx -> {
+                assertDoesNotThrow(() -> keyValueView.put(tx, key, val));
+
+                assertEquals(val, keyValueView.get(tx, key));
+            });
+
+            node.transactions().runInTransaction(tx -> {
+                // Check the replica read inside the another transaction
+                assertEquals(val, keyValueView.get(tx, key));
+            });
+        }
+
+        {
+            createTable(node, "test_zone", "test_table1");
+            int tableId = TableTestUtils.getTableId(node.catalogManager, 
"test_table1", node.hybridClock.nowLong());
+
+            prepareTableIdToZoneIdConverter(
+                    node,
+                    new TablePartitionId(tableId, 0),
+                    new ZonePartitionId(zoneId, 0)
+            );
+
+            KeyValueView<Long, Integer> keyValueView = 
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+            int val = 200;
 
-        assertDoesNotThrow(() -> keyValueView.put(null, 1L, 1));
+            node.transactions().runInTransaction(tx -> {
+                assertDoesNotThrow(() -> keyValueView.put(tx, key, val));
 
-        // Actually we are testing not the fair put value, but the hardcoded 
one from temporary noop replica listener
-        assertEquals(-1, keyValueView.get(null, 1L));
+                assertEquals(val, keyValueView.get(tx, key));
+            });
+        }
     }
 
     @Test
@@ -682,6 +726,18 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         assertTrue(waitForCondition(() -> 
getNode(0).replicaManager.isReplicaStarted(partId), 10_000L));
     }
 
+    private void prepareTableIdToZoneIdConverter(Node node, TablePartitionId 
tablePartitionId, ZonePartitionId zonePartitionId) {
+        node.converter.set(request ->  {
+            if 
(request.groupId().asReplicationGroupId().equals(tablePartitionId)
+                    && !(request instanceof WriteIntentSwitchReplicaRequest)) {
+                return zonePartitionId;
+            } else {
+                return request.groupId().asReplicationGroupId();
+            }
+        });
+
+    }
+
     private Node getNode(int nodeIndex) {
         return nodes.get(nodeIndex);
     }
@@ -790,12 +846,15 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
         private final ScheduledExecutorService rebalanceScheduler;
 
-        private final Map<ReplicationGroupId, ReplicationGroupId> converter = 
new ConcurrentHashMap<>();
+        private AtomicReference<Function<ReplicaRequest, ReplicationGroupId>> 
converter =
+                new AtomicReference<>(request -> 
request.groupId().asReplicationGroupId());
 
         private final LogStorageFactory logStorageFactory;
 
         private final IndexMetaStorage indexMetaStorage;
 
+        private final HybridTimestampTracker observableTimestampTracker = new 
HybridTimestampTracker();
+
         /**
          * Constructor that simply creates a subset of components of this node.
          */
@@ -1026,7 +1085,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     raftManager,
                     view -> new LocalLogStorageFactory(),
                     ForkJoinPool.commonPool(),
-                    t -> (converter.get(t) != null) ? converter.get(t) : t
+                    t -> converter.get().apply(t)
             );
 
             LongSupplier delayDurationMsSupplier = () -> 10L;
@@ -1064,7 +1123,8 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     clusterService.topologyService(),
                     lowWatermark,
                     threadPoolsManager.tableIoExecutor(),
-                    rebalanceScheduler
+                    rebalanceScheduler,
+                    threadPoolsManager.partitionOperationsExecutor()
             );
 
             StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
@@ -1081,7 +1141,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     clusterService.topologyService(),
                     clusterService.serializationRegistry(),
                     replicaManager,
-                    mock(LockManager.class),
+                    lockManager,
                     replicaSvc,
                     txManager,
                     dataStorageMgr,
@@ -1097,14 +1157,15 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     distributionZoneManager,
                     schemaSyncService,
                     catalogManager,
-                    new HybridTimestampTracker(),
+                    observableTimestampTracker,
                     placementDriver,
                     () -> mock(IgniteSql.class),
                     resourcesRegistry,
                     lowWatermark,
                     transactionInflights,
                     indexMetaStorage,
-                    logSyncer
+                    logSyncer,
+                    partitionReplicaLifecycleManager
             );
 
             
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
@@ -1119,6 +1180,11 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
             );
         }
 
+        private IgniteTransactions transactions() {
+            IgniteTransactionsImpl transactions = new 
IgniteTransactionsImpl(txManager, observableTimestampTracker);
+            return new PublicApiThreadingIgniteTransactions(transactions, 
ForkJoinPool.commonPool());
+        }
+
         private void waitForMetadataCompletenessAtNow() {
             
assertThat(schemaSyncService.waitForMetadataCompleteness(hybridClock.now()), 
willCompleteSuccessfully());
         }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 1b7e7db986..e1b36dfdf8 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -61,6 +61,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory;
+import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
@@ -162,6 +164,11 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
     /** Executor for scheduling rebalance routine. */
     private final ScheduledExecutorService rebalanceScheduler;
 
+    /**
+     * Executes partition operations (that might cause I/O and/or be blocked 
on locks).
+     */
+    private final Executor partitionOperationsExecutor;
+
     /**
      * The constructor.
      *
@@ -170,7 +177,9 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
      * @param distributionZoneMgr Distribution zone manager.
      * @param metaStorageMgr Metastorage manager.
      * @param topologyService Topology service.
-     * @param ioExecutor Separate executor for IO operations.
+     * @param rebalanceScheduler Executor for scheduling rebalance routine.
+     * @param partitionOperationsExecutor Striped executor on which partition 
operations (potentially requiring I/O with storages)
+     *     will be executed.
      */
     public PartitionReplicaLifecycleManager(
             CatalogManager catalogMgr,
@@ -180,7 +189,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             TopologyService topologyService,
             LowWatermark lowWatermark,
             ExecutorService ioExecutor,
-            ScheduledExecutorService rebalanceScheduler
+            ScheduledExecutorService rebalanceScheduler,
+            Executor partitionOperationsExecutor
     ) {
         this.catalogMgr = catalogMgr;
         this.replicaMgr = replicaMgr;
@@ -190,6 +200,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         this.lowWatermark = lowWatermark;
         this.ioExecutor = ioExecutor;
         this.rebalanceScheduler = rebalanceScheduler;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
 
         pendingAssignmentsRebalanceListener = 
createPendingAssignmentsRebalanceListener();
         stableAssignmentsRebalanceListener = 
createStableAssignmentsRebalanceListener();
@@ -404,7 +415,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         try {
             return replicaMgr.startReplica(
                     replicaGrpId,
-                    new ZonePartitionReplicaListener(),
+                    (raftClient) -> new ZonePartitionReplicaListener(
+                            new ExecutorInclinedRaftCommandRunner(raftClient, 
partitionOperationsExecutor)),
                     new FailFastSnapshotStorageFactory(),
                     stablePeersAndLearners,
                     raftGroupListener,
@@ -427,13 +439,6 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         });
     }
 
-    private boolean shouldStartLocally(Assignments assignments) {
-        return assignments
-                .nodes()
-                .stream()
-                .anyMatch(a -> a.consistentId().equals(localNode().name()));
-    }
-
     private ClusterNode localNode() {
         return topologyService.localMember();
     }
@@ -579,6 +584,17 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         return assignmentsFuture;
     }
 
+    /**
+     * Check if the current node has local replica for this {@link 
ZonePartitionId}.
+     *
+     * @param zonePartitionId Zone partition id.
+     * @return true if local replica exists, false otherwise.
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 replace this 
method by the replicas await process.
+    public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
+        return replicationGroupIds.contains(zonePartitionId);
+    }
+
 
     /**
      * Creates meta storage listener for pending assignments updates.
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
index 47523b102b..2895a0b3c0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
@@ -17,27 +17,71 @@
 
 package org.apache.ignite.internal.partition.replicator;
 
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.io.Serializable;
 import java.nio.file.Path;
 import java.util.Iterator;
+import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
+import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.tx.TransactionResult;
 
 /**
  * RAFT listener for the zone partition.
  */
 public class ZonePartitionRaftListener implements RaftGroupListener {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ZonePartitionRaftListener.class);
 
     @Override
     public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
-        // No-op
+        iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo) 
-> {
+            Command command = clo.command();
+
+            assert false : "No read commands expected, [cmd=" + command + ']';
+        });
     }
 
     @Override
     public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
-        // No-op
+        iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) 
-> {
+            Command command = clo.command();
+
+            Serializable result = null;
+
+            try {
+                if (command instanceof FinishTxCommand) {
+                    FinishTxCommand cmd = (FinishTxCommand) command;
+
+                    result = new TransactionResult(cmd.commit() ? COMMITTED : 
ABORTED, cmd.commitTimestamp());
+                } else {
+                    LOG.debug("Message type " + command.getClass() + " is not 
supported by the zone partition RAFT listener yet");
+                }
+            } catch (IgniteInternalException e) {
+                result = e;
+            } catch (CompletionException e) {
+                result = e.getCause();
+            } catch (Throwable t) {
+                LOG.error(
+                        "Unknown error while processing command 
[commandIndex={}, commandTerm={}, command={}]",
+                        t,
+                        clo.index(), clo.index(), command
+                );
+
+                throw t;
+            }
+
+            clo.result(result);
+        });
     }
 
     @Override
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 49142b0a29..45c30025fd 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -17,31 +17,107 @@
 
 package org.apache.ignite.internal.partition.replicator;
 
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaResult;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.replicator.message.TableAware;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 
 /**
  * Zone partition replica listener.
  */
 public class ZonePartitionReplicaListener implements ReplicaListener {
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(ZonePartitionReplicaListener.class);
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the 
table replica listener if needed.
+    private final Map<TablePartitionId, ReplicaListener> replicas = new 
ConcurrentHashMap<>();
+
+    private final RaftCommandRunner raftClient;
+
+    /**
+     * The constructor.
+     *
+     * @param raftClient Raft client.
+     */
+    public ZonePartitionReplicaListener(RaftCommandRunner raftClient) {
+        this.raftClient = raftClient;
+    }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
String senderId) {
-        var res = new BinaryRowImpl(
-                1,
-                new BinaryTupleBuilder(2).appendLong(1).appendInt(-1).build());
-        return CompletableFuture.completedFuture(new ReplicaResult(
-                res,
-                CompletableFuture.completedFuture(res)));
+        if (!(request instanceof TableAware)) {
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 
implement ReplicaSafeTimeSyncRequest processing.
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-22621 
implement zone-based transaction storage
+            //  and txn messages processing
+            if (request instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest txFinishReplicaRequest = 
(TxFinishReplicaRequest) request;
+
+                TxFinishReplicaRequest requestForTableListener = 
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
+                        .txId(txFinishReplicaRequest.txId())
+                        
.commitPartitionId(txFinishReplicaRequest.commitPartitionId())
+                        .timestamp(txFinishReplicaRequest.timestamp())
+                        .groupId(txFinishReplicaRequest.commitPartitionId())
+                        .groups(txFinishReplicaRequest.groups())
+                        .commit(txFinishReplicaRequest.commit())
+                        
.commitTimestamp(txFinishReplicaRequest.commitTimestamp())
+                        
.enlistmentConsistencyToken(txFinishReplicaRequest.enlistmentConsistencyToken())
+                        .build();
+
+                return replicas
+                        
.get(txFinishReplicaRequest.commitPartitionId().asTablePartitionId())
+                        .invoke(requestForTableListener, senderId);
+            } else {
+                LOG.debug("Non table request is not supported by the zone 
partition yet " + request);
+            }
+
+            return nullCompletedFuture();
+        } else {
+            int partitionId;
+
+            ReplicationGroupId replicationGroupId = 
request.groupId().asReplicationGroupId();
+
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine 
this code when the zone based replication will done.
+            if (replicationGroupId instanceof  TablePartitionId) {
+                partitionId = ((TablePartitionId) 
replicationGroupId).partitionId();
+            } else if (replicationGroupId instanceof ZonePartitionId) {
+                partitionId = ((ZonePartitionId) 
replicationGroupId).partitionId();
+            } else {
+                throw new IllegalArgumentException("Requests with replication 
group type "
+                        + request.groupId().getClass() + " is not supported");
+            }
+
+            return replicas.get(new TablePartitionId(((TableAware) 
request).tableId(), partitionId))
+                    .invoke(request, senderId);
+        }
     }
 
     @Override
     public RaftCommandRunner raftClient() {
-        throw new UnsupportedOperationException("Raft client is not defined in 
ZoneReplicaListener");
+        return raftClient;
+    }
+
+    /**
+     * Add table partition listener to the current zone replica listener.
+     *
+     * @param partitionId Table partition id.
+     * @param replicaListener Table replica listener.
+     */
+    public void addTableReplicaListener(TablePartitionId partitionId, 
Function<RaftCommandRunner, ReplicaListener> replicaListener) {
+        replicas.put(partitionId, replicaListener.apply(raftClient));
     }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
index 7aa1c16aab..7480f17363 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
@@ -20,10 +20,11 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read only direct multi row replica request.
  */
 
@Transferable(PartitionReplicationMessageGroup.RO_DIRECT_MULTI_ROW_REPLICA_REQUEST)
-public interface ReadOnlyDirectMultiRowReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+public interface ReadOnlyDirectMultiRowReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest, TableAware {
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
index 52a73c9cb4..51309a7fcc 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
@@ -20,11 +20,12 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read only direct node single row replica request.
  * The type of RO request never waits and is executed at the current node 
timestamp.
  */
 
@Transferable(PartitionReplicationMessageGroup.RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST)
-public interface ReadOnlyDirectSingleRowReplicaRequest extends 
SingleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+public interface ReadOnlyDirectSingleRowReplicaRequest extends 
SingleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest, TableAware {
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
index f10384436e..4a8e6354b0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
@@ -19,10 +19,11 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read only multi row replica request.
  */
 @Transferable(PartitionReplicationMessageGroup.RO_MULTI_ROW_REPLICA_REQUEST)
-public interface ReadOnlyMultiRowPkReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadOnlyReplicaRequest {
+public interface ReadOnlyMultiRowPkReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadOnlyReplicaRequest, TableAware {
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
index b4d65621bf..bd17616633 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
@@ -20,12 +20,13 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 import java.util.UUID;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Scan retrieve batch replica request.
  */
 
@Transferable(PartitionReplicationMessageGroup.RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
-public interface ReadOnlyScanRetrieveBatchReplicaRequest extends 
ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest {
+public interface ReadOnlyScanRetrieveBatchReplicaRequest extends 
ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest, TableAware {
     UUID transactionId();
 
     /**
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
index c841fc0c05..82bd1f6fc5 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
@@ -19,10 +19,11 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read only single row replica request.
  */
 @Transferable(PartitionReplicationMessageGroup.RO_SINGLE_ROW_REPLICA_REQUEST)
-public interface ReadOnlySingleRowPkReplicaRequest extends 
SingleRowPkReplicaRequest, ReadOnlyReplicaRequest {
+public interface ReadOnlySingleRowPkReplicaRequest extends 
SingleRowPkReplicaRequest, ReadOnlyReplicaRequest, TableAware {
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
index 1be4c532eb..919e93dff0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
@@ -19,12 +19,13 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read-write multi-row replica request involving table's Primary Keys.
  */
 @Transferable(PartitionReplicationMessageGroup.RW_MULTI_ROW_PK_REPLICA_REQUEST)
-public interface ReadWriteMultiRowPkReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteMultiRowPkReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadWriteReplicaRequest, TableAware {
     /**
      * Disable delayed ack optimization.
      *
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
index 0e1e26066b..c8d85755d2 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
@@ -20,13 +20,14 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 import java.util.BitSet;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Read-write multi-row replica request.
  */
 @Transferable(PartitionReplicationMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST)
-public interface ReadWriteMultiRowReplicaRequest extends 
MultipleRowReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteMultiRowReplicaRequest extends 
MultipleRowReplicaRequest, ReadWriteReplicaRequest, TableAware {
     /**
      * Disable delayed ack optimization.
      *
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
index 2d0cbb02ea..be2f0f24ff 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
@@ -19,10 +19,11 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Scan retrieve batch replica request.
  */
 
@Transferable(PartitionReplicationMessageGroup.RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
-public interface ReadWriteScanRetrieveBatchReplicaRequest extends 
ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteScanRetrieveBatchReplicaRequest extends 
ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest, TableAware {
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
index 2f27d6b42d..e0c13a3aec 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
@@ -19,10 +19,11 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read-write single-row replica request involving a table's Primary Key..
  */
 
@Transferable(PartitionReplicationMessageGroup.RW_SINGLE_ROW_PK_REPLICA_REQUEST)
-public interface ReadWriteSingleRowPkReplicaRequest extends 
SingleRowPkReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteSingleRowPkReplicaRequest extends 
SingleRowPkReplicaRequest, ReadWriteReplicaRequest, TableAware {
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
index a8d5e266b7..4f33af949a 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
@@ -19,10 +19,11 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read-write single-row replica request.
  */
 @Transferable(PartitionReplicationMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST)
-public interface ReadWriteSingleRowReplicaRequest extends 
SingleRowReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteSingleRowReplicaRequest extends 
SingleRowReplicaRequest, ReadWriteReplicaRequest, TableAware {
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
index 42322a70ef..65ee84ac86 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
@@ -19,11 +19,12 @@ package 
org.apache.ignite.internal.partition.replicator.network.replication;
 
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
 
 /**
  * Read-write dual row replica request.
  */
 @Transferable(PartitionReplicationMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
-public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest, 
ReadWriteReplicaRequest {
+public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest, 
ReadWriteReplicaRequest, TableAware {
 
 }
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 6ebb2e6292..51f54bd185 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -505,7 +505,7 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
             );
             serviceFutures.add(raftClientFut);
 
-            CompletableFuture<Boolean> replicaFuture = 
raftClientFut.thenCompose(raftClient -> {
+            CompletableFuture<Replica> replicaFuture = 
raftClientFut.thenCompose(raftClient -> {
                 try {
                     ReplicaListener listener = new ReplicaListener() {
                         @Override
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 3974c139fa..b9141c3f98 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessage;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 
 /**
@@ -36,6 +37,14 @@ public interface Replica {
     @Deprecated(forRemoval = true)
     TopologyAwareRaftGroupService raftClient();
 
+    /**
+     * Returns replica's listener.
+     *
+     * @return Replica's listener.
+     */
+    @Deprecated(forRemoval = true)
+    ReplicaListener listener();
+
     /**
      * Processes a replication request on the replica.
      *
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
index 86dc454979..ef1c673869 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
@@ -133,6 +133,11 @@ public class ReplicaImpl implements Replica {
         raftClient.subscribeLeader(this::onLeaderElected);
     }
 
+    @Override
+    public ReplicaListener listener() {
+        return listener;
+    }
+
     @Override
     public final TopologyAwareRaftGroupService raftClient() {
         return (TopologyAwareRaftGroupService) listener.raftClient();
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 06f87a580a..344b966318 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -208,7 +208,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
     /* Temporary converter to support the zone based partitions in tests. **/
     // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this 
code
-    private Function<ReplicationGroupId, ReplicationGroupId> groupIdConverter 
= Function.identity();
+    private Function<ReplicaRequest, ReplicationGroupId> groupIdConverter = r 
-> r.groupId().asReplicationGroupId();
 
     /**
      * Constructor for a replica service.
@@ -246,7 +246,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             RaftManager raftManager,
             LogStorageFactoryCreator volatileLogStorageFactoryCreator,
             Executor replicaStartStopExecutor,
-            Function<ReplicationGroupId, ReplicationGroupId> groupIdConverter
+            Function<ReplicaRequest, ReplicationGroupId> groupIdConverter
     ) {
         this(
                 nodeName,
@@ -392,7 +392,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             return;
         }
 
-        ReplicationGroupId groupId = 
groupIdConverter.apply(request.groupId().asReplicationGroupId());
+        ReplicationGroupId groupId = groupIdConverter.apply(request);
 
         String senderConsistentId = sender.name();
 
@@ -573,7 +573,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         });
     }
 
-    private CompletableFuture<Boolean> startReplicaInternal(
+    private CompletableFuture<Replica> startReplicaInternal(
             RaftGroupEventsListener raftGroupEventsListener,
             RaftGroupListener raftGroupListener,
             boolean isVolatileStorage,
@@ -628,7 +628,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      *
      * @return Future that promises ready new replica when done.
      */
-    public CompletableFuture<Boolean> startReplica(
+    public CompletableFuture<Replica> startReplica(
             RaftGroupEventsListener raftGroupEventsListener,
             RaftGroupListener raftGroupListener,
             boolean isVolatileStorage,
@@ -663,7 +663,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * Starts a replica. If a replica with the same partition id already 
exists, the method throws an exception.
      *
      * @param replicaGrpId Replication group id.
-     * @param listener Replica listener.
      * @param snapshotStorageFactory Snapshot storage factory for raft group 
option's parameterization.
      * @param newConfiguration A configuration for new raft group.
      * @param raftGroupListener Raft group listener for raft group starting.
@@ -674,7 +673,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      */
     public CompletableFuture<Replica> startReplica(
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener,
+            Function<RaftGroupService, ReplicaListener> listener,
             SnapshotStorageFactory snapshotStorageFactory,
             PeersAndLearners newConfiguration,
             RaftGroupListener raftGroupListener,
@@ -706,7 +705,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
                 Replica newReplica = new ZonePartitionReplicaImpl(
                         replicaGrpId,
-                        listener,
+                        listener.apply(raftClient),
                         raftClient
                 );
 
@@ -756,7 +755,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      */
     @VisibleForTesting
     @Deprecated
-    public CompletableFuture<Boolean> startReplica(
+    public CompletableFuture<Replica> startReplica(
             ReplicationGroupId replicaGrpId,
             PeersAndLearners newConfiguration,
             Consumer<RaftGroupService> updateTableRaftService,
@@ -772,8 +771,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                     updateTableRaftService.accept(raftClient);
                     return createListener.apply(raftClient);
                 }, replicasCreationExecutor)
-                .thenCompose(replicaListener -> startReplica(replicaGrpId, 
storageIndexTracker, completedFuture(replicaListener)))
-                .thenApply(r -> true);
+                .thenCompose(replicaListener -> startReplica(replicaGrpId, 
storageIndexTracker, completedFuture(replicaListener)));
     }
 
     /**
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
index 5cd05a8a5e..993219518f 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
@@ -30,12 +30,11 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaRequest;
  * Replica for the zone based partitions.
  */
 public class ZonePartitionReplicaImpl implements Replica {
-
     private final ReplicationGroupId replicaGrpId;
 
     private final ReplicaListener listener;
 
-    TopologyAwareRaftGroupService raftClient;
+    private final TopologyAwareRaftGroupService raftClient;
 
     /**
      * Constructor.
@@ -54,6 +53,11 @@ public class ZonePartitionReplicaImpl implements Replica {
         this.raftClient = raftClient;
     }
 
+    @Override
+    public ReplicaListener listener() {
+        return listener;
+    }
+
     @Override
     public TopologyAwareRaftGroupService raftClient() {
         return raftClient;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TableAware.java
similarity index 63%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
copy to 
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TableAware.java
index 42322a70ef..91525c3abf 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TableAware.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator.network.replication;
+package org.apache.ignite.internal.replicator.message;
 
-import org.apache.ignite.internal.network.annotations.Transferable;
-import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.network.NetworkMessage;
 
 /**
- * Read-write dual row replica request.
+ * Generic interface for all messages about concrete table.
  */
-@Transferable(PartitionReplicationMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
-public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest, 
ReadWriteReplicaRequest {
-
+public interface TableAware extends NetworkMessage {
+    /** Table Id. */
+    int tableId();
 }
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index fcd13cc344..c7999187c5 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -180,7 +180,7 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
         String nodeName = testNodeName(testInfo, 0);
         PeersAndLearners newConfiguration = 
PeersAndLearners.fromConsistentIds(Set.of(nodeName));
 
-        CompletableFuture<Boolean> startReplicaFuture = 
replicaManager.startReplica(
+        CompletableFuture<Replica> startReplicaFuture = 
replicaManager.startReplica(
                 groupId,
                 newConfiguration,
                 (unused) -> { },
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index ad99548e69..e2d643b558 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -145,6 +145,7 @@ import 
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.recovery.VaultStaleIds;
 import 
org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceFactory;
 import 
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAfterSend;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import 
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
@@ -654,7 +655,18 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 lowWatermark,
                 transactionInflights,
                 indexMetaStorage,
-                logSyncer
+                logSyncer,
+                new PartitionReplicaLifecycleManager(
+                        catalogManager,
+                        replicaMgr,
+                        distributionZoneManager,
+                        metaStorageMgr,
+                        clusterSvc.topologyService(),
+                        lowWatermark,
+                        threadPoolsManager.tableIoExecutor(),
+                        rebalanceScheduler,
+                        threadPoolsManager.partitionOperationsExecutor()
+                )
         );
 
         var indexManager = new IndexManager(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3413714268..b836ebb361 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -751,7 +751,8 @@ public class IgniteImpl implements Ignite {
                 clusterSvc.topologyService(),
                 lowWatermark,
                 threadPoolsManager.tableIoExecutor(),
-                rebalanceScheduler
+                rebalanceScheduler,
+                threadPoolsManager.partitionOperationsExecutor()
         );
 
         TransactionConfiguration txConfig = 
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
@@ -829,7 +830,8 @@ public class IgniteImpl implements Ignite {
                 lowWatermark,
                 transactionInflights,
                 indexMetaStorage,
-                logSyncer
+                logSyncer,
+                partitionReplicaLifecycleManager
         );
 
         disasterRecoveryManager = new DisasterRecoveryManager(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 5689c1e6e0..05fbd824d9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -148,6 +148,7 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
 
             ReplicaRequest request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(serializeTablePartitionId(partGroupId))
+                    .tableId(tableId)
                     
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
                     
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
                     .binaryTuples(binaryRowsToBuffers(partToRows.getValue()))
@@ -253,6 +254,7 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
 
             ReadWriteMultiRowReplicaRequest request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(serializeTablePartitionId(partGroupId))
+                    .tableId(tableId)
                     
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
                     
.schemaVersion(rowBatch.requestedRows.get(0).schemaVersion())
                     .binaryTuples(binaryRowsToBuffers(rowBatch.requestedRows))
@@ -322,6 +324,7 @@ public final class UpdatableTableImpl implements 
UpdatableTable {
 
             ReplicaRequest request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
                     .groupId(serializeTablePartitionId(partGroupId))
+                    .tableId(tableId)
                     
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
                     
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
                     .primaryKeys(serializePrimaryKeys(partToRows.getValue()))
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index cf81093dec..51c6fa87b4 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -113,6 +113,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @ExtendWith(ConfigurationExtension.class)
 public class ReplicaUnavailableTest extends IgniteAbstractTest {
+    private static final int TABLE_ID = 1;
+
     private static final String NODE_NAME = "client";
 
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
@@ -222,7 +224,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
     public void testWithReplicaStartedAfterRequestSending() throws Exception {
         ClusterNode clusterNode = 
clusterService.topologyService().localMember();
 
-        TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+        TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
 
         ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
@@ -266,6 +268,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
         return tableMessagesFactory.readWriteSingleRowReplicaRequest()
                 .groupId(toTablePartitionIdMessage(replicaMessageFactory, 
tablePartitionId))
+                .tableId(TABLE_ID)
                 .transactionId(TestTransactionIds.newTransactionId())
                 .commitPartitionId(tablePartitionId())
                 .timestamp(clock.now())
@@ -281,7 +284,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
     public void testStopReplicaException() {
         ClusterNode clusterNode = 
clusterService.topologyService().localMember();
 
-        TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+        TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
 
         ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
@@ -316,7 +319,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
     public void testWithNotStartedReplica() {
         ClusterNode clusterNode = 
clusterService.topologyService().localMember();
 
-        TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+        TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
 
         ReadWriteSingleRowReplicaRequest request = 
getRequest(tablePartitionId);
 
@@ -346,7 +349,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
     public void testWithNotReadyReplica() {
         ClusterNode clusterNode = 
clusterService.topologyService().localMember();
 
-        TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+        TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
 
         PeersAndLearners newConfiguration = 
PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name()));
 
@@ -397,7 +400,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
 
     private TablePartitionIdMessage tablePartitionId() {
         return replicaMessageFactory.tablePartitionIdMessage()
-                .tableId(1)
+                .tableId(TABLE_ID)
                 .partitionId(1)
                 .build();
     }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9dae96eafd..2eecfa4732 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -154,6 +154,7 @@ import 
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryProfileConfigurationSchema;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
@@ -625,7 +626,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     }
 
     /**
-     * Test checks rebances from [A,B,C] to [A,B] and then again to [A,B,C].
+     * Test checks rebalances from [A,B,C] to [A,B] and then again to [A,B,C].
      * In this case the raft group node and {@link Replica} are started only 
once on each node.
      *
      * <p>1. We have an in-progress rebalance and current metastore keys:
@@ -1351,7 +1352,18 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     lowWatermark,
                     transactionInflights,
                     indexMetaStorage,
-                    logSyncer
+                    logSyncer,
+                    new PartitionReplicaLifecycleManager(
+                            catalogManager,
+                            replicaManager,
+                            distributionZoneManager,
+                            metaStorageManager,
+                            clusterService.topologyService(),
+                            lowWatermark,
+                            threadPoolsManager.tableIoExecutor(),
+                            rebalanceScheduler,
+                            threadPoolsManager.partitionOperationsExecutor()
+                    )
             ) {
                 @Override
                 protected TxStateTableStorage createTxStateTableStorage(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 392ef322af..e2aa8d3e5e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -148,6 +148,8 @@ import 
org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
@@ -156,6 +158,7 @@ import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
@@ -165,6 +168,7 @@ import 
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReaso
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -408,6 +412,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final String nodeName;
 
+    private final PartitionReplicaLifecycleManager 
partitionReplicaLifecycleManager;
+
     private long implicitTransactionTimeout;
 
     private int attemptsObtainLock;
@@ -446,6 +452,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      * @param lowWatermark Low watermark.
      * @param transactionInflights Transaction inflights.
      * @param indexMetaStorage Index meta storage.
+     * @param partitionReplicaLifecycleManager Partition replica lifecycle 
manager.
      */
     public TableManager(
             String nodeName,
@@ -480,7 +487,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             LowWatermark lowWatermark,
             TransactionInflights transactionInflights,
             IndexMetaStorage indexMetaStorage,
-            LogSyncer logSyncer
+            LogSyncer logSyncer,
+            PartitionReplicaLifecycleManager partitionReplicaLifecycleManager
     ) {
         this.topologyService = topologyService;
         this.replicaMgr = replicaMgr;
@@ -507,6 +515,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.txCfg = txCfg;
         this.nodeName = nodeName;
         this.indexMetaStorage = indexMetaStorage;
+        this.partitionReplicaLifecycleManager = 
partitionReplicaLifecycleManager;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -610,6 +619,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
 
             catalogService.listen(CatalogEvent.TABLE_CREATE, parameters -> 
onTableCreate((CreateTableEventParameters) parameters));
+            catalogService.listen(CatalogEvent.TABLE_CREATE, parameters ->
+                    
prepareTableResourcesAndLoadToZoneReplica((CreateTableEventParameters) 
parameters));
             catalogService.listen(CatalogEvent.TABLE_DROP, 
fromConsumer(this::onTableDrop));
             catalogService.listen(CatalogEvent.TABLE_ALTER, parameters -> {
                 if (parameters instanceof RenameTableEventParameters) {
@@ -635,6 +646,133 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         });
     }
 
+    private CompletableFuture<Boolean> 
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters 
parameters) {
+        if (!PartitionReplicaLifecycleManager.ENABLED) {
+            return completedFuture(false);
+        }
+
+        long causalityToken = parameters.causalityToken();
+        CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
+        CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
+
+        TableImpl table =  createTableImpl(causalityToken, tableDescriptor, 
zoneDescriptor);
+
+        tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
+            return schemaManager.schemaRegistry(causalityToken, 
parameters.tableId()).thenAccept(table::schemaView);
+        }));
+
+        // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
+        CompletableFuture<?> localPartsUpdateFuture = 
localPartitionsVv.update(causalityToken,
+                (ignore, throwable) -> inBusyLock(busyLock, () -> 
nullCompletedFuture().thenComposeAsync((ignored) -> {
+                    PartitionSet parts = new BitSetPartitionSet();
+
+                    for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+                        if 
(partitionReplicaLifecycleManager.hasLocalPartition(new 
ZonePartitionId(zoneDescriptor.id(), i))) {
+                            parts.set(i);
+                        }
+                    }
+
+                    return getOrCreatePartitionStorages(table, 
parts).thenAccept(u -> localPartsByTableId.put(parameters.tableId(), parts));
+                }, ioExecutor)));
+
+        CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
+
+        CompletableFuture<?> createPartsFut = 
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
+            return allOf(localPartsUpdateFuture, 
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
+                        var startPartsFut = new 
ArrayList<CompletableFuture<?>>();
+
+                        for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+                            if 
(partitionReplicaLifecycleManager.hasLocalPartition(new 
ZonePartitionId(zoneDescriptor.id(), i))) {
+                                
startPartsFut.add(preparePartitionResourcesAndLoadToZoneReplica(
+                                        table,
+                                        i,
+                                        zoneDescriptor.id()));
+                            }
+                        }
+
+                        return 
allOf(startPartsFut.toArray(CompletableFuture[]::new));
+                    }
+            ), ioExecutor);
+        });
+
+        tables.put(parameters.tableId(), table);
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible 
performance degradation.
+        return createPartsFut.thenAccept(ignore -> 
startedTables.put(parameters.tableId(), table))
+                .thenApply(unused -> false);
+
+    }
+
+    /**
+     * Prepare the table partition resources and load it to the zone-based 
replica.
+     *
+     * @param table Table.
+     * @param partId Partition id.
+     * @param zoneId Zone id.
+     * @return Future, which will complete when the table processor loaded to 
the zone replica.
+     */
+    private CompletableFuture<Void> 
preparePartitionResourcesAndLoadToZoneReplica(
+            TableImpl table,
+            int partId,
+            int zoneId
+    ) {
+        int tableId = table.tableId();
+
+        var internalTbl = (InternalTableImpl) table.internalTable();
+
+        TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
+
+        return inBusyLockAsync(busyLock, () -> {
+            var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, 
Void>(HybridTimestamp.MIN_VALUE);
+
+            var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
+
+            PartitionStorages partitionStorages = getPartitionStorages(table, 
partId);
+
+            PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+                    internalTbl, partId);
+
+            
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
+
+            PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+                    partId,
+                    partitionDataStorage,
+                    table,
+                    safeTimeTracker,
+                    storageUpdateConfig
+            );
+
+            internalTbl.updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
+
+            mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
+
+            Function<RaftCommandRunner, ReplicaListener> createListener = 
(raftClient) -> createReplicaListener(
+                    replicaGrpId,
+                    table,
+                    safeTimeTracker,
+                    partitionStorages.getMvPartitionStorage(),
+                    partitionStorages.getTxStateStorage(),
+                    partitionUpdateHandlers,
+                    raftClient);
+
+            return replicaMgr.replica(new ZonePartitionId(zoneId, partId))
+                    .thenAcceptAsync(zoneReplica ->
+                            ((ZonePartitionReplicaListener) 
zoneReplica.listener()).addTableReplicaListener(
+                                    new TablePartitionId(tableId, partId), 
createListener
+                            ), ioExecutor
+                    );
+        });
+    }
+
+
     private CompletableFuture<Boolean> 
onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
         if 
(topologyService.localMember().id().equals(parameters.leaseholderId())) {
             TablePartitionId groupId = (TablePartitionId) parameters.groupId();
@@ -998,7 +1136,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                                 createListener,
                                 storageIndexTracker,
                                 replicaGrpId,
-                                stablePeersAndLearners);
+                                stablePeersAndLearners).thenApply(ignored -> 
true);
                     } catch (NodeStoppingException e) {
                         throw new AssertionError("Loza was stopped before 
Table manager", e);
                     }
@@ -1054,7 +1192,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             MvPartitionStorage mvPartitionStorage,
             TxStateStorage txStatePartitionStorage,
             PartitionUpdateHandlers partitionUpdateHandlers,
-            RaftGroupService raftClient
+            RaftCommandRunner raftClient
     ) {
         int tableId = tablePartitionId.tableId();
         int partId = tablePartitionId.partitionId();
@@ -1199,6 +1337,66 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
     }
 
+    /**
+     * Create table storages and internal instance.
+     *
+     * @param causalityToken Causality token.
+     * @param tableDescriptor Catalog table descriptor.
+     * @param zoneDescriptor Catalog distribution zone descriptor.
+     * @return Table instance.
+     */
+    private TableImpl createTableImpl(
+            long causalityToken,
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor
+    ) {
+        String tableName = tableDescriptor.name();
+
+        LOG.trace("Creating local table: name={}, id={}, token={}", 
tableDescriptor.name(), tableDescriptor.id(), causalityToken);
+
+        MvTableStorage tableStorage = createTableStorage(tableDescriptor, 
zoneDescriptor);
+        TxStateTableStorage txStateStorage = 
createTxStateTableStorage(tableDescriptor, zoneDescriptor);
+
+        int partitions = zoneDescriptor.partitions();
+
+        TableRaftServiceImpl tableRaftService = new TableRaftServiceImpl(
+                tableName,
+                partitions,
+                new Int2ObjectOpenHashMap<>(partitions),
+                topologyService
+        );
+
+        InternalTableImpl internalTable = new InternalTableImpl(
+                tableName,
+                tableDescriptor.id(),
+                partitions,
+                topologyService,
+                txManager,
+                tableStorage,
+                txStateStorage,
+                replicaSvc,
+                clock,
+                observableTimestampTracker,
+                executorInclinedPlacementDriver,
+                tableRaftService,
+                transactionInflights,
+                implicitTransactionTimeout,
+                attemptsObtainLock,
+                this::streamerFlushExecutor,
+                Objects.requireNonNull(streamerReceiverRunner)
+        );
+
+        return new TableImpl(
+                internalTable,
+                lockMgr,
+                schemaVersions,
+                marshallers,
+                sql.get(),
+                tableDescriptor.primaryKeyIndexId()
+        );
+    }
+
+
     /**
      * Creates local structures for a table.
      *
@@ -1257,51 +1455,9 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             CompletableFuture<List<Assignments>> assignmentsFuture,
             boolean onNodeRecovery
     ) {
-        String tableName = tableDescriptor.name();
-        int tableId = tableDescriptor.id();
+        TableImpl table =  createTableImpl(causalityToken, tableDescriptor, 
zoneDescriptor);
 
-        LOG.trace("Creating local table: name={}, id={}, token={}", 
tableDescriptor.name(), tableDescriptor.id(), causalityToken);
-
-        MvTableStorage tableStorage = createTableStorage(tableDescriptor, 
zoneDescriptor);
-        TxStateTableStorage txStateStorage = 
createTxStateTableStorage(tableDescriptor, zoneDescriptor);
-
-        int partitions = zoneDescriptor.partitions();
-
-        TableRaftServiceImpl tableRaftService = new TableRaftServiceImpl(
-                tableName,
-                partitions,
-                new Int2ObjectOpenHashMap<>(partitions),
-                topologyService
-        );
-
-        InternalTableImpl internalTable = new InternalTableImpl(
-                tableName,
-                tableId,
-                partitions,
-                topologyService,
-                txManager,
-                tableStorage,
-                txStateStorage,
-                replicaSvc,
-                clock,
-                observableTimestampTracker,
-                executorInclinedPlacementDriver,
-                tableRaftService,
-                transactionInflights,
-                implicitTransactionTimeout,
-                attemptsObtainLock,
-                this::streamerFlushExecutor,
-                Objects.requireNonNull(streamerReceiverRunner)
-        );
-
-        var table = new TableImpl(
-                internalTable,
-                lockMgr,
-                schemaVersions,
-                marshallers,
-                sql.get(),
-                tableDescriptor.primaryKeyIndexId()
-        );
+        int tableId = tableDescriptor.id();
 
         tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
             if (e != null) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 49b0df8f9a..fe5e7559cf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -561,6 +561,7 @@ public class InternalTableImpl implements InternalTable {
         Function<Long, ReplicaRequest> mapFunc =
                 (enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         .groupId(serializeTablePartitionId(partGroupId))
+                        .tableId(tableId)
                         .timestamp(clock.now())
                         .transactionId(tx.id())
                         .scanId(scanId)
@@ -870,6 +871,7 @@ public class InternalTableImpl implements InternalTable {
                     keyRow,
                     (groupId, consistencyToken) -> 
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
                             .groupId(serializeTablePartitionId(groupId))
+                            .tableId(tableId)
                             .enlistmentConsistencyToken(consistencyToken)
                             .schemaVersion(keyRow.schemaVersion())
                             .primaryKey(keyRow.tupleSlice())
@@ -888,6 +890,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         .schemaVersion(keyRow.schemaVersion())
                         .primaryKey(keyRow.tupleSlice())
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
@@ -913,6 +916,7 @@ public class InternalTableImpl implements InternalTable {
 
         return replicaSvc.invoke(recipientNode, 
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(serializeTablePartitionId(tablePartitionId))
+                .tableId(tableId)
                 .schemaVersion(keyRow.schemaVersion())
                 .primaryKey(keyRow.tupleSlice())
                 .requestTypeInt(RO_GET.ordinal())
@@ -955,6 +959,7 @@ public class InternalTableImpl implements InternalTable {
                     keyRows,
                     (groupId, consistencyToken) -> 
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
                             .groupId(serializeTablePartitionId(groupId))
+                            .tableId(tableId)
                             .enlistmentConsistencyToken(consistencyToken)
                             
.schemaVersion(keyRows.iterator().next().schemaVersion())
                             .primaryKeys(serializeBinaryTuples(keyRows))
@@ -994,6 +999,7 @@ public class InternalTableImpl implements InternalTable {
 
             ReadOnlyMultiRowPkReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
                     .groupId(serializeTablePartitionId(tablePartitionId))
+                    .tableId(tableId)
                     
.schemaVersion(partitionRowBatch.getValue().requestedRows.get(0).schemaVersion())
                     
.primaryKeys(serializeBinaryTuples(partitionRowBatch.getValue().requestedRows))
                     .requestTypeInt(RO_GET_ALL.ordinal())
@@ -1018,6 +1024,7 @@ public class InternalTableImpl implements InternalTable {
 
         return TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
                 .groupId(serializeTablePartitionId(groupId))
+                .tableId(tableId)
                 
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
                 .schemaVersion(rows.iterator().next().schemaVersion())
                 .primaryKeys(serializeBinaryTuples(rows))
@@ -1086,6 +1093,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(row.schemaVersion())
                         .binaryTuple(row.tupleSlice())
@@ -1170,6 +1178,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(row.schemaVersion())
                         .binaryTuple(row.tupleSlice())
@@ -1192,6 +1201,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(row.schemaVersion())
                         .binaryTuple(row.tupleSlice())
@@ -1248,6 +1258,7 @@ public class InternalTableImpl implements InternalTable {
 
         return TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                 .groupId(serializeTablePartitionId(groupId))
+                .tableId(tableId)
                 
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
                 .schemaVersion(rows.iterator().next().schemaVersion())
                 .binaryTuples(serializeBinaryTuples(rows))
@@ -1269,6 +1280,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(row.schemaVersion())
                         .binaryTuple(row.tupleSlice())
@@ -1294,6 +1306,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(oldRow.schemaVersion())
                         .oldBinaryTuple(oldRow.tupleSlice())
@@ -1317,6 +1330,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(row.schemaVersion())
                         .binaryTuple(row.tupleSlice())
@@ -1339,6 +1353,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(keyRow.schemaVersion())
                         .primaryKey(keyRow.tupleSlice())
@@ -1361,6 +1376,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(oldRow.schemaVersion())
                         .binaryTuple(oldRow.tupleSlice())
@@ -1383,6 +1399,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 (txo, groupId, enlistmentConsistencyToken) -> 
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                         .groupId(serializeTablePartitionId(groupId))
+                        .tableId(tableId)
                         
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
                         .schemaVersion(row.schemaVersion())
                         .primaryKey(row.tupleSlice())
@@ -1582,6 +1599,7 @@ public class InternalTableImpl implements InternalTable {
                 (scanId, batchSize) -> {
                     ReadOnlyScanRetrieveBatchReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                             
.groupId(serializeTablePartitionId(tablePartitionId))
+                            .tableId(tableId)
                             .readTimestamp(readTimestamp)
                             .transactionId(txId)
                             .scanId(scanId)
@@ -1691,6 +1709,7 @@ public class InternalTableImpl implements InternalTable {
                 (scanId, batchSize) -> {
                     ReadWriteScanRetrieveBatchReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                             
.groupId(serializeTablePartitionId(tablePartitionId))
+                            .tableId(tableId)
                             .timestamp(clock.now())
                             .transactionId(txId)
                             .scanId(scanId)
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index cb53d8ee1a..c401bf50d8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -55,6 +55,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
@@ -86,6 +87,7 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
@@ -351,7 +353,18 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 lowWatermark,
                 new TransactionInflights(placementDriver, clockService),
                 indexMetaStorage,
-                logSyncer
+                logSyncer,
+                new PartitionReplicaLifecycleManager(
+                        catalogManager,
+                        replicaMgr,
+                        distributionZoneManager,
+                        metaStorageManager,
+                        topologyService,
+                        lowWatermark,
+                        ForkJoinPool.commonPool(),
+                        mock(ScheduledExecutorService.class),
+                        partitionOperationsExecutor
+                )
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 3d6492e005..7c291b4296 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -104,6 +104,7 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
@@ -263,6 +264,10 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     private IndexMetaStorage indexMetaStorage;
 
+    /** Partition replica lifecycle manager. */
+    @Mock
+    private PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
+
     @BeforeEach
     void before() throws NodeStoppingException {
         lowWatermark = new TestLowWatermark();
@@ -290,7 +295,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(emptySetCompletedFuture());
 
         when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), 
any(), any(), any(), any()))
-                .thenReturn(trueCompletedFuture());
+                .thenReturn(nullCompletedFuture());
         when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
         when(replicaMgr.weakStartReplica(any(), any(), any())).thenAnswer(inv 
-> {
             Supplier<CompletableFuture<Void>> startOperation = 
inv.getArgument(1);
@@ -832,7 +837,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 lowWatermark,
                 mock(TransactionInflights.class),
                 indexMetaStorage,
-                logSyncer
+                logSyncer,
+                partitionReplicaLifecycleManager
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d344ceac12..8855f160cb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -337,6 +337,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
             case RW_GET_AND_DELETE:
                 request = 
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                         .groupId(tablePartitionId(PARTITION_ID))
+                        .tableId(TABLE_ID)
                         .enlistmentConsistencyToken(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
@@ -357,6 +358,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
             case RW_GET_AND_UPSERT:
                 request = 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(tablePartitionId(PARTITION_ID))
+                        .tableId(TABLE_ID)
                         .enlistmentConsistencyToken(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
@@ -425,6 +427,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
             case RW_DELETE_ALL:
                 request = 
TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
                         .groupId(tablePartitionId(PARTITION_ID))
+                        .tableId(TABLE_ID)
                         .enlistmentConsistencyToken(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
@@ -442,6 +445,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
             case RW_UPSERT_ALL:
                 request = 
TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                         .groupId(tablePartitionId(PARTITION_ID))
+                        .tableId(TABLE_ID)
                         .enlistmentConsistencyToken(1L)
                         .commitPartitionId(tablePartitionId(PARTITION_ID))
                         .transactionId(TRANSACTION_ID)
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 89ea604cd5..83738b6489 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -801,6 +801,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk, 
HybridTimestamp readTimestamp) {
         ReadOnlySingleRowPkReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .readTimestamp(readTimestamp)
                 .schemaVersion(pk.schemaVersion())
                 .primaryKey(pk.tupleSlice())
@@ -813,6 +814,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<ReplicaResult> 
doReadOnlyDirectSingleGet(BinaryRow pk) {
         ReadOnlyDirectSingleRowReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .schemaVersion(pk.schemaVersion())
                 .primaryKey(pk.tupleSlice())
                 .requestTypeInt(RO_GET.ordinal())
@@ -921,6 +923,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
                 
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(scanTxId)
                         .timestamp(clock.now())
                         
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -940,6 +943,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Request second batch
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(scanTxId)
                 .timestamp(clock.now())
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -959,6 +963,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Request bounded.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .timestamp(clock.now())
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -981,6 +986,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Empty result.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .timestamp(clock.now())
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -1001,6 +1007,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Lookup.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .timestamp(clock.now())
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -1044,6 +1051,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
                 
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(scanTxId)
                         .readTimestamp(clock.now())
                         .scanId(1L)
@@ -1060,6 +1068,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Request second batch
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(scanTxId)
                 .readTimestamp(clock.now())
                 .scanId(1L)
@@ -1076,6 +1085,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Request bounded.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .readTimestamp(clock.now())
                 .scanId(2L)
@@ -1095,6 +1105,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Empty result.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .readTimestamp(clock.now())
                 .scanId(2L)
@@ -1112,6 +1123,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Lookup.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .readTimestamp(clock.now())
                 .scanId(2L)
@@ -1152,6 +1164,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
                 
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(scanTxId)
                         .readTimestamp(clock.now())
                         .scanId(1L)
@@ -1169,6 +1182,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Request second batch
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(scanTxId)
                 .readTimestamp(clock.now())
                 .scanId(1L)
@@ -1186,6 +1200,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Empty result.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .readTimestamp(clock.now())
                 .scanId(2L)
@@ -1203,6 +1218,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         // Lookup.
         fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .transactionId(newTxId())
                 .readTimestamp(clock.now())
                 .scanId(2L)
@@ -1329,6 +1345,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow 
binaryRow, RequestType requestType, boolean full) {
         return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(txId)
                         .requestTypeInt(requestType.ordinal())
                         .schemaVersion(binaryRow.schemaVersion())
@@ -1350,6 +1367,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow 
binaryRow, RequestType requestType, boolean full) {
         return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(txId)
                         .requestTypeInt(requestType.ordinal())
                         .schemaVersion(binaryRow.schemaVersion())
@@ -1382,6 +1400,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<?> doMultiRowRequest(UUID txId, 
Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
         return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(txId)
                         .requestTypeInt(requestType.ordinal())
                         
.schemaVersion(binaryRows.iterator().next().schemaVersion())
@@ -1407,6 +1426,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<?> doMultiRowPkRequest(UUID txId, 
Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
         return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(txId)
                         .requestTypeInt(requestType.ordinal())
                         
.schemaVersion(binaryRows.iterator().next().schemaVersion())
@@ -1433,6 +1453,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
                     return 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                             .groupId(tablePartitionIdMessage(grpId))
+                            .tableId(TABLE_ID)
                             .transactionId(txId)
                             .requestTypeInt(RW_INSERT.ordinal())
                             .schemaVersion(binaryRow.schemaVersion())
@@ -1463,6 +1484,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
                     return 
TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                             .groupId(tablePartitionIdMessage(grpId))
+                            .tableId(TABLE_ID)
                             .transactionId(txId)
                             .requestTypeInt(RW_UPSERT_ALL.ordinal())
                             .schemaVersion(binaryRow0.schemaVersion())
@@ -1709,6 +1731,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         TxFinishReplicaRequest commitRequest = 
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .commitPartitionId(tablePartitionIdMessage(grpId))
                 .txId(txId)
                 .groups(Map.of(tablePartitionIdMessage(grpId), 
localNode.name()))
                 .commit(false)
@@ -1773,6 +1796,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         TxFinishReplicaRequest commitRequest = 
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .commitPartitionId(tablePartitionIdMessage(grpId))
                 .txId(txId)
                 .groups(Map.of(tablePartitionIdMessage(grpId), 
localNode.name()))
                 .commit(true)
@@ -1951,6 +1975,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow 
oldRow, BinaryRow newRow, boolean full) {
         return 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(targetTxId)
                         .requestTypeInt(RW_REPLACE.ordinal())
                         .schemaVersion(oldRow.schemaVersion())
@@ -1972,6 +1997,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 (targetTxId, key) -> partitionReplicaListener.invoke(
                         
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                                 .groupId(tablePartitionIdMessage(grpId))
+                                .tableId(TABLE_ID)
                                 .transactionId(targetTxId)
                                 .indexToUse(sortedIndexStorage.id())
                                 
.exactKey(toIndexKey(FUTURE_SCHEMA_ROW_INDEXED_VALUE))
@@ -1993,6 +2019,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 (targetTxId, key) -> partitionReplicaListener.invoke(
                         
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                                 .groupId(tablePartitionIdMessage(grpId))
+                                .tableId(TABLE_ID)
                                 .transactionId(targetTxId)
                                 .indexToUse(sortedIndexStorage.id())
                                 
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -2018,6 +2045,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         return partitionReplicaListener.invoke(
                 
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(targetTxId)
                         
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .scanId(1)
@@ -2046,6 +2074,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         return partitionReplicaListener.invoke(
                 
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                         .groupId(tablePartitionIdMessage(grpId))
+                        .tableId(TABLE_ID)
                         .transactionId(targetTxId)
                         .scanId(1)
                         .batchSize(100)
@@ -2519,6 +2548,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         CompletableFuture<?> future = partitionReplicaListener.invoke(
                 TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                         .groupId(tablePartitionIdMessage(commitPartitionId))
+                        
.commitPartitionId(tablePartitionIdMessage(commitPartitionId))
                         .groups(groups.entrySet().stream().collect(toMap(e -> 
tablePartitionIdMessage(e.getKey()), Map.Entry::getValue)))
                         .txId(txId)
                         
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -2668,6 +2698,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow 
row, boolean full) {
         ReadWriteSingleRowReplicaRequest message = 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .requestTypeInt(RW_UPSERT.ordinal())
                 .transactionId(txId)
                 .schemaVersion(row.schemaVersion())
@@ -2685,6 +2716,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private void delete(UUID txId, BinaryRow row) {
         ReadWriteSingleRowPkReplicaRequest message = 
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .requestTypeInt(RW_DELETE.ordinal())
                 .transactionId(txId)
                 .schemaVersion(row.schemaVersion())
@@ -2709,6 +2741,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row, 
HybridTimestamp readTimestamp) {
         ReadOnlySingleRowPkReplicaRequest message = 
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .requestTypeInt(RO_GET.ordinal())
                 .readTimestamp(readTimestamp)
                 .schemaVersion(row.schemaVersion())
@@ -2729,6 +2762,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<ReplicaResult> 
doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
         ReadOnlyMultiRowPkReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .requestTypeInt(RO_GET_ALL.ordinal())
                 .readTimestamp(readTimestamp)
                 .schemaVersion(rows.iterator().next().schemaVersion())
@@ -2741,6 +2775,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private CompletableFuture<ReplicaResult> 
doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) {
         ReadOnlyDirectMultiRowReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
+                .tableId(TABLE_ID)
                 .requestTypeInt(RO_GET_ALL.ordinal())
                 .schemaVersion(rows.iterator().next().schemaVersion())
                 .primaryKeys(binaryRowsToBuffers(rows))
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index edee7fbdf1..b1985a6a36 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -161,10 +161,16 @@ public class TxMessageSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp
     ) {
+        TablePartitionIdMessage commitPartitionIdMessage = 
REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
+                .partitionId(commitPartition.partitionId())
+                .tableId(commitPartition.tableId())
+                .build();
+
         return replicaService.invoke(
                 primaryConsistentId,
                 TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                         .txId(txId)
+                        .commitPartitionId(commitPartitionIdMessage)
                         .timestamp(clockService.now())
                         
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartition))
                         
.groups(toTablePartitionIdMessages(replicationGroupIds))
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
index ae39416d2e..d73c81eb56 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
@@ -44,6 +44,13 @@ public interface TxFinishReplicaRequest extends 
PrimaryReplicaRequest, Timestamp
      */
     UUID txId();
 
+    /**
+     * Returns commit partition id.
+     *
+     * @return Commit partition id.
+     */
+    TablePartitionIdMessage commitPartitionId();
+
     /**
      * Returns {@code True} if a commit request.
      *

Reply via email to