This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f6c022a822 IGNITE-22218 Remove TableRaftService in favor of using 
RaftGroupService from Replica instances (#3973)
f6c022a822 is described below

commit f6c022a822c8e1185d65464dda0af9be23468b80
Author: Mikhail Efremov <[email protected]>
AuthorDate: Mon Aug 5 18:00:13 2024 +0600

    IGNITE-22218 Remove TableRaftService in favor of using RaftGroupService 
from Replica instances (#3973)
---
 .../ignite/client/fakes/FakeInternalTable.java     |   7 -
 modules/index/build.gradle                         |   1 +
 .../replicator/ItReplicaLifecycleTest.java         |  14 +-
 .../PartitionReplicaLifecycleManager.java          | 225 ++++++++++++++++-----
 .../ItPrimaryReplicaChoiceTest.java                |  17 +-
 .../ItPlacementDriverReplicaSideTest.java          |   1 -
 .../ignite/internal/replicator/ReplicaManager.java |  19 +-
 .../internal/replicator/ReplicaManagerTest.java    |   1 -
 .../internal/replicator/ReplicaTestUtils.java      | 139 +++++++++++++
 modules/runner/build.gradle                        |   1 +
 .../app/ItIgniteInMemoryNodeRestartTest.java       |  51 +++--
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |   9 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  11 +-
 .../exec/rel/TableScanNodeExecutionTest.java       |   9 -
 modules/table/build.gradle                         |   2 +
 .../ItAbstractInternalTableScanTest.java           |  59 +++++-
 .../ItInternalTableReadOnlyOperationsTest.java     |  15 +-
 .../ItInternalTableReadWriteScanTest.java          |  11 +-
 .../ignite/distributed/ReplicaUnavailableTest.java |   2 -
 .../rebalance/ItRebalanceDistributedTest.java      | 108 +++++-----
 .../ignite/internal/table/ItColocationTest.java    |   2 -
 .../ignite/internal/table/InternalTable.java       |   7 -
 .../apache/ignite/internal/table/TableImpl.java    |   6 -
 .../ignite/internal/table/TableRaftService.java    |  52 -----
 .../ignite/internal/table/TableViewInternal.java   |  10 -
 .../internal/table/distributed/TableManager.java   | 183 +++++++++--------
 .../distributed/storage/InternalTableImpl.java     |  15 +-
 .../distributed/storage/TableRaftServiceImpl.java  | 167 ---------------
 .../distributed/TableManagerRecoveryTest.java      |   6 +-
 .../table/distributed/TableManagerTest.java        |   2 +-
 .../storage/InternalTableEstimatedSizeTest.java    |   2 -
 .../distributed/storage/InternalTableImplTest.java |   3 -
 .../apache/ignite/distributed/ItTxTestCluster.java |   6 +-
 .../ignite/internal/table/TxAbstractTest.java      |   8 +-
 .../table/impl/DummyInternalTableImpl.java         |  21 +-
 36 files changed, 661 insertions(+), 535 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 6c276c030c..2789d783dc 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
-import org.apache.ignite.internal.table.TableRaftService;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -461,12 +460,6 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
-    @Override
-    public TableRaftService tableRaftService() {
-        throw new IgniteInternalException(new 
OperationNotSupportedException());
-    }
-
-
     @Override public TxStateTableStorage txStateStorage() {
         return null;
     }
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 0693fbe8c8..e66f790bcf 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -81,6 +81,7 @@ dependencies {
     integrationTestImplementation testFixtures(project(':ignite-table'))
     integrationTestImplementation testFixtures(project(':ignite-storage-api'))
     integrationTestImplementation testFixtures(project(':ignite-catalog'))
+    integrationTestImplementation testFixtures(project(':ignite-replicator'))
     integrationTestImplementation libs.jetbrains.annotations
 }
 
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 50eb3d999f..40e4ebbe97 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -118,6 +118,7 @@ import 
org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metrics.NoOpMetricManager;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.StaticNodeFinder;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -176,6 +177,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.sql.IgniteSql;
@@ -304,6 +306,8 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
         node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), 
"cluster");
 
+        placementDriver.setPrimary(node0.toClusterNode());
+
         nodes.values().forEach(Node::waitWatches);
 
         assertThat(
@@ -346,6 +350,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22928";)
     public void testZoneReplicaListener(TestInfo testInfo) throws Exception {
         startNodes(testInfo, 3);
 
@@ -1125,8 +1130,9 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     lowWatermark,
                     threadPoolsManager.tableIoExecutor(),
                     rebalanceScheduler,
-                    threadPoolsManager.partitionOperationsExecutor()
-            );
+                    threadPoolsManager.partitionOperationsExecutor(),
+                    clockService,
+                    placementDriver);
 
             StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
 
@@ -1280,6 +1286,10 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
             nodeCfgGenerator.close();
             clusterCfgGenerator.close();
         }
+
+        ClusterNode toClusterNode() {
+            return new ClusterNodeImpl(name, name, networkAddress);
+        }
     }
 
     @FunctionalInterface
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index d5b7a54510..529c625c21 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -28,6 +28,8 @@ import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener.handleReduceChanged;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
@@ -38,6 +40,7 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -53,8 +56,8 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -67,6 +70,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
+import java.util.function.Predicate;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -78,8 +82,9 @@ import 
org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
-import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
 import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
+import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
+import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -98,12 +103,12 @@ import 
org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -169,6 +174,15 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
      */
     private final Executor partitionOperationsExecutor;
 
+    /** Clock service for primary replica awaitings. */
+    private final ClockService clockService;
+
+    /** Placement driver for primary replicas checks. */
+    private final PlacementDriver placementDriver;
+
+    /** A predicate that checks that the given assignment is corresponded to 
the local node. */
+    private final Predicate<Assignment> isLocalNodeAssignment = assignment -> 
assignment.consistentId().equals(localNode().name());
+
     /**
      * The constructor.
      *
@@ -180,6 +194,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
      * @param rebalanceScheduler Executor for scheduling rebalance routine.
      * @param partitionOperationsExecutor Striped executor on which partition 
operations (potentially requiring I/O with storages)
      *     will be executed.
+     * @param clockService Clock service.
+     * @param placementDriver Placement driver.
      */
     public PartitionReplicaLifecycleManager(
             CatalogManager catalogMgr,
@@ -190,7 +206,9 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             LowWatermark lowWatermark,
             ExecutorService ioExecutor,
             ScheduledExecutorService rebalanceScheduler,
-            Executor partitionOperationsExecutor
+            Executor partitionOperationsExecutor,
+            ClockService clockService,
+            PlacementDriver placementDriver
     ) {
         this.catalogMgr = catalogMgr;
         this.replicaMgr = replicaMgr;
@@ -201,6 +219,9 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
         this.ioExecutor = ioExecutor;
         this.rebalanceScheduler = rebalanceScheduler;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
+        this.clockService = clockService;
+
+        this.placementDriver = placementDriver;
 
         pendingAssignmentsRebalanceListener = 
createPendingAssignmentsRebalanceListener();
         stableAssignmentsRebalanceListener = 
createStableAssignmentsRebalanceListener();
@@ -746,7 +767,6 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                 ? emptySet()
                 : 
Assignments.fromBytes(stableAssignmentsWatchEvent.value()).nodes();
 
-
         return supplyAsync(() -> {
             Entry pendingAssignmentsEntry = 
metaStorageMgr.getLocally(pendingPartAssignmentsKey(zonePartitionId), revision);
 
@@ -769,13 +789,22 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             ZonePartitionId zonePartitionId,
             Set<Assignment> stableAssignments
     ) {
-        // Update raft client peers and learners according to the actual 
assignments.
-        if (replicaMgr.isReplicaStarted(zonePartitionId)) {
-            replicaMgr.replica(zonePartitionId).join()
-                    
.raftClient().updateConfiguration(fromAssignments(stableAssignments));
-        }
+        return isLocalNodeIsPrimary(zonePartitionId).thenCompose(isLeaseholder 
-> inBusyLock(busyLock, () -> {
+            boolean isLocalInStable = 
isLocalNodeInAssignments(stableAssignments);
 
-        return nullCompletedFuture();
+            if (!isLocalInStable && !isLeaseholder) {
+                return nullCompletedFuture();
+            }
+
+            assert replicaMgr.isReplicaStarted(zonePartitionId)
+                    : "The local node is outside of the replication group 
[stable=" + stableAssignments
+                    + ", isLeaseholder=" + isLeaseholder + "].";
+
+            // Update raft client peers and learners according to the actual 
assignments.
+            return replicaMgr.replica(zonePartitionId)
+                    .thenApply(Replica::raftClient)
+                    .thenAccept(raftClient -> 
raftClient.updateConfiguration(fromAssignments(stableAssignments)));
+        }));
     }
 
     private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
@@ -796,10 +825,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                 .noneMatch(assignment -> 
assignment.consistentId().equals(localNode().name()));
 
         if (shouldStopLocalServices) {
-            return allOf(
-                    clientUpdateFuture,
-                    stopAndDestroyPartition(zonePartitionId)
-            );
+            return clientUpdateFuture.thenCompose(v -> 
stopAndDestroyPartition(zonePartitionId))
+                    .thenAccept(v -> { });
         } else {
             return clientUpdateFuture;
         }
@@ -844,14 +871,26 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             return handleChangePendingAssignmentEvent(
                     zonePartitionId,
                     stableAssignments,
-                    pendingAssignments
-            ).thenCompose(v -> changePeersOnRebalance(
-                    replicaMgr,
-                    zonePartitionId,
-                    pendingAssignments.nodes(),
-                    stableAssignments == null ? emptySet() : 
stableAssignments.nodes(),
-                    revision)
-            );
+                    pendingAssignments,
+                    revision
+            ).thenCompose(v -> {
+                boolean isLocalNodeInStableOrPending = 
isNodeInReducedStableOrPendingAssignments(
+                        zonePartitionId,
+                        stableAssignments,
+                        pendingAssignments,
+                        revision
+                );
+
+                if (!isLocalNodeInStableOrPending) {
+                    return nullCompletedFuture();
+                }
+
+                return changePeersOnRebalance(
+                        replicaMgr,
+                        zonePartitionId,
+                        pendingAssignments.nodes(),
+                        revision);
+            });
         } finally {
             busyLock.leaveBusy();
         }
@@ -860,7 +899,8 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
     private CompletableFuture<Void> handleChangePendingAssignmentEvent(
             ZonePartitionId replicaGrpId,
             @Nullable Assignments stableAssignments,
-            Assignments pendingAssignments
+            Assignments pendingAssignments,
+            long revision
     ) {
         boolean pendingAssignmentsAreForced = pendingAssignments.force();
         Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
@@ -912,41 +952,45 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             localServicesStartFuture = nullCompletedFuture();
         }
 
-        return localServicesStartFuture.thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
-            if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
-                return;
-            }
+        return localServicesStartFuture
+                .thenComposeAsync(v -> inBusyLock(busyLock, () -> 
isLocalNodeIsPrimary(replicaGrpId)), ioExecutor)
+                .thenAcceptAsync(isLeaseholder -> inBusyLock(busyLock, () -> {
+                    boolean isLocalNodeInStableOrPending = 
isNodeInReducedStableOrPendingAssignments(
+                            replicaGrpId,
+                            stableAssignments,
+                            pendingAssignments,
+                            revision
+                    );
 
-            // For forced assignments, we exclude dead stable nodes, and all 
alive stable nodes are already in pending assignments.
-            // Union is not required in such a case.
-            Set<Assignment> newAssignments = pendingAssignmentsAreForced || 
stableAssignments == null
-                    ? pendingAssignmentsNodes
-                    : RebalanceUtil.union(pendingAssignmentsNodes, 
stableAssignments.nodes());
+                    if (!isLocalNodeInStableOrPending && !isLeaseholder) {
+                        return;
+                    }
 
-            
replicaMgr.replica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
-        }), ioExecutor);
+                    assert isLocalNodeInStableOrPending || isLeaseholder
+                            : "The local node is outside of the replication 
group [inStableOrPending=" + isLocalNodeInStableOrPending
+                            + ", isLeaseholder=" + isLeaseholder + "].";
+
+                    // For forced assignments, we exclude dead stable nodes, 
and all alive stable nodes are already in pending assignments.
+                    // Union is not required in such a case.
+                    Set<Assignment> newAssignments = 
pendingAssignmentsAreForced || stableAssignments == null
+                            ? pendingAssignmentsNodes
+                            : union(pendingAssignmentsNodes, 
stableAssignments.nodes());
+
+                    replicaMgr.replica(replicaGrpId)
+                            .thenApply(Replica::raftClient)
+                            .thenAccept(raftClient -> 
raftClient.updateConfiguration(fromAssignments(newAssignments)));
+                }), ioExecutor);
     }
 
     private CompletableFuture<Void> changePeersOnRebalance(
             ReplicaManager replicaMgr,
             ZonePartitionId replicaGrpId,
             Set<Assignment> pendingAssignments,
-            Set<Assignment> stableAssignments,
             long revision
     ) {
-        Set<Assignment> union = new HashSet<>();
-        union.addAll(pendingAssignments);
-        union.addAll(stableAssignments);
-
-        if 
(!union.stream().map(Assignment::consistentId).collect(toSet()).contains(localNode().name()))
 {
-            return nullCompletedFuture();
-        }
-
-        int partId = replicaGrpId.partitionId();
-
-        RaftGroupService partGrpSvc = 
replicaMgr.replica(replicaGrpId).join().raftClient();
-
-        return partGrpSvc.refreshAndGetLeaderWithTerm()
+        return replicaMgr.replica(replicaGrpId)
+                .thenApply(Replica::raftClient)
+                .thenCompose(raftClient -> 
raftClient.refreshAndGetLeaderWithTerm()
                 .exceptionally(throwable -> {
                     throwable = unwrapCause(throwable);
 
@@ -970,7 +1014,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                     // run update of raft configuration if this node is a 
leader
                     LOG.info("Current node={} is the leader of partition raft 
group={}. "
                                     + "Initiate rebalance process for 
partition={}, zoneId={}",
-                            leaderWithTerm.leader(), replicaGrpId, partId, 
replicaGrpId.zoneId());
+                            leaderWithTerm.leader(), replicaGrpId, 
replicaGrpId.partitionId(), replicaGrpId.zoneId());
 
                     return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
                             .thenCompose(latestPendingAssignmentsEntry -> {
@@ -983,19 +1027,96 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
 
                                 PeersAndLearners newConfiguration = 
fromAssignments(pendingAssignments);
 
-                                CompletableFuture<Void> voidCompletableFuture 
= partGrpSvc.changePeersAndLearnersAsync(newConfiguration,
+                                CompletableFuture<Void> voidCompletableFuture 
= raftClient.changePeersAndLearnersAsync(newConfiguration,
                                         leaderWithTerm.term()).exceptionally(e 
-> {
                                             return null;
                                         });
                                 return voidCompletableFuture;
                             });
-                });
+                }));
     }
 
     private boolean isLocalPeer(Peer peer) {
         return peer.consistentId().equals(localNode().name());
     }
 
+    private boolean isLocalNodeInAssignments(Collection<Assignment> 
assignments) {
+        return assignments.stream().anyMatch(isLocalNodeAssignment);
+    }
+
+    /**
+     * Checks that the local node is primary or not.
+     * <br>
+     * Internally we use there {@link PlacementDriver#getPrimaryReplica} with 
a penultimate
+     * safe time value, because metastore is waiting for pending or stable 
assignments events handling over and only then metastore will
+     * increment the safe time. On the other hand placement driver internally 
is waiting the metastore for given safe time plus
+     * {@link ClockService#maxClockSkewMillis}. So, if given time is just 
{@link ClockService#now}, then there is a dead lock: metastore
+     * is waiting until assignments handling is over, but internally placement 
driver is waiting for a non-applied safe time.
+     * <br>
+     * To solve this issue we pass to {@link 
PlacementDriver#getPrimaryReplica} current time minus the skew, so placement 
driver could
+     * successfully get primary replica for the time stamp before the handling 
has began. Also there a corner case for tests that are using
+     * {@code WatchListenerInhibitor#metastorageEventsInhibitor} and it leads 
to current time equals {@link HybridTimestamp#MIN_VALUE} and
+     * the skew's subtraction will lead to {@link IllegalArgumentException} 
from {@link HybridTimestamp}. Then, if we got the minimal
+     * possible timestamp, then we also couldn't have any primary replica, 
then return {@code false}.
+     *
+     * @param replicationGroupId Replication group ID for that we check is the 
local node a primary.
+     * @return {@code true} is the local node is primary and {@code false} 
otherwise.
+     */
+    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId 
replicationGroupId) {
+        HybridTimestamp currentSafeTime = 
metaStorageMgr.clusterTime().currentSafeTime();
+
+        if (HybridTimestamp.MIN_VALUE.equals(currentSafeTime)) {
+            return falseCompletedFuture();
+        }
+
+        long skewMs = clockService.maxClockSkewMillis();
+
+        try {
+            HybridTimestamp previousMetastoreSafeTime = 
currentSafeTime.subtractPhysicalTime(skewMs);
+
+            return placementDriver.getPrimaryReplica(replicationGroupId, 
previousMetastoreSafeTime)
+                    .thenApply(replicaMeta -> replicaMeta != null
+                            && replicaMeta.getLeaseholderId() != null
+                            && 
replicaMeta.getLeaseholderId().equals(localNode().id()));
+        } catch (IllegalArgumentException e) {
+            long currentSafeTimeMs = currentSafeTime.longValue();
+
+            throw new AssertionError("Got a negative time [currentSafeTime=" + 
currentSafeTime
+                    + ", currentSafeTimeMs=" + currentSafeTimeMs
+                    + ", skewMs=" + skewMs
+                    + ", internal=" + (currentSafeTimeMs + ((-skewMs) << 
LOGICAL_TIME_BITS_SIZE)) + "]", e);
+        }
+    }
+
+    private boolean isNodeInReducedStableOrPendingAssignments(
+            ZonePartitionId replicaGrpId,
+            @Nullable Assignments stableAssignments,
+            Assignments pendingAssignments,
+            long revision
+    ) {
+        Entry reduceEntry  = 
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId), 
revision);
+
+        Assignments reduceAssignments = reduceEntry != null
+                ? Assignments.fromBytes(reduceEntry.value())
+                : null;
+
+        Set<Assignment> reducedStableAssignments = reduceAssignments != null
+                ? subtract(stableAssignments.nodes(), 
reduceAssignments.nodes())
+                : stableAssignments.nodes();
+
+        if (!isLocalNodeInAssignments(union(reducedStableAssignments, 
pendingAssignments.nodes()))) {
+            return false;
+        }
+
+        assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local node is 
outside of the replication group ["
+                + ", stable=" + stableAssignments
+                + ", pending=" + pendingAssignments
+                + ", reduce=" + reduceAssignments
+                + ", localName=" + localNode().name() + "].";
+
+        return true;
+    }
+
     @Nullable
     private Assignment localMemberAssignment(Assignments assignments) {
         Assignment localMemberAssignment = 
Assignment.forPeer(localNode().name());
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index c6523efc82..269d926085 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -154,13 +155,14 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
     @Test
     public void testPrimaryChangeLongHandling() throws Exception {
-        TableViewInternal tbl = 
unwrapTableImpl(node(0).tables().table(TABLE_NAME));
+        IgniteImpl node = node(0);
+        TableViewInternal tbl = 
unwrapTableImpl(node.tables().table(TABLE_NAME));
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node.placementDriver().awaitPrimaryReplica(
                 tblReplicationGrp,
-                node(0).clock().now(),
+                node.clock().now(),
                 AWAIT_PRIMARY_REPLICA_TIMEOUT,
                 SECONDS
         );
@@ -184,7 +186,7 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
         CompletableFuture<String> primaryChangeTask =
                 IgniteTestUtils.runAsync(() -> 
NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
 
-        waitingForLeaderCache(tbl, primary);
+        waitingForLeaderCache(node, tbl);
 
         assertFalse(primaryChangeTask.isDone());
 
@@ -399,12 +401,13 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
     /**
      * Waits when the leader would be a different with the current primary 
replica.
      *
+     * @param node Ignite node.
      * @param tbl Table.
-     * @param primary Current primary replica name.
      * @throws InterruptedException If fail.
      */
-    private static void waitingForLeaderCache(TableViewInternal tbl, String 
primary) throws InterruptedException {
-        RaftGroupService raftSrvc = 
tbl.internalTable().tableRaftService().partitionRaftGroupService(0);
+    private static void waitingForLeaderCache(IgniteImpl node, 
TableViewInternal tbl) throws InterruptedException {
+        RaftGroupService raftSrvc = ReplicaTestUtils.getRaftClient(node, 
tbl.tableId(), 0)
+                        .orElseThrow(AssertionError::new);
 
         assertTrue(waitForCondition(() -> {
             raftSrvc.refreshLeader();
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 51f54bd185..5b88e751bd 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -528,7 +528,6 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                     return replicaManager.startReplica(
                             groupId,
                             newConfiguration,
-                            (unused) -> { },
                             (unused) -> listener,
                             new 
PendingComparableValuesTracker<>(Long.MAX_VALUE),
                             completedFuture(raftClient));
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 344b966318..566a01ae0a 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -53,7 +53,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
@@ -578,7 +577,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             RaftGroupListener raftGroupListener,
             boolean isVolatileStorage,
             SnapshotStorageFactory snapshotStorageFactory,
-            Consumer<RaftGroupService> updateTableRaftService,
             Function<RaftGroupService, ReplicaListener> createListener,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
             TablePartitionId replicaGrpId,
@@ -604,7 +602,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         return startReplica(
                 replicaGrpId,
                 newConfiguration,
-                updateTableRaftService,
                 createListener,
                 storageIndexTracker,
                 newRaftClientFut
@@ -618,8 +615,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param raftGroupListener Raft group listener for raft group starting.
      * @param isVolatileStorage is table storage volatile?
      * @param snapshotStorageFactory Snapshot storage factory for raft group 
option's parameterization.
-     * @param updateTableRaftService Temporal consumer while TableRaftService 
wouldn't be removed in
-     *      TODO: https://issues.apache.org/jira/browse/IGNITE-22218.
      * @param createListener Due to creation of ReplicaListener in 
TableManager, the function returns desired listener by created
      *      raft-client inside {@link #startReplica} method.
      * @param replicaGrpId Replication group id.
@@ -633,7 +628,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             RaftGroupListener raftGroupListener,
             boolean isVolatileStorage,
             SnapshotStorageFactory snapshotStorageFactory,
-            Consumer<RaftGroupService> updateTableRaftService,
             Function<RaftGroupService, ReplicaListener> createListener,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
             TablePartitionId replicaGrpId,
@@ -649,7 +643,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                     raftGroupListener,
                     isVolatileStorage,
                     snapshotStorageFactory,
-                    updateTableRaftService,
                     createListener,
                     storageIndexTracker,
                     replicaGrpId,
@@ -745,8 +738,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      *
      * @param replicaGrpId Replication group id.
      * @param newConfiguration Peers and Learners of the Raft group.
-     * @param updateTableRaftService A temporal clojure that updates table 
raft service with new raft-client, but
-     *      TODO: will be removed 
https://issues.apache.org/jira/browse/IGNITE-22218
      * @param createListener A clojure that returns done {@link 
ReplicaListener} by given raft-client {@link RaftGroupService}.
      * @param storageIndexTracker Storage index tracker.
      * @param newRaftClientFut A future that returns created raft-client.
@@ -758,7 +749,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     public CompletableFuture<Replica> startReplica(
             ReplicationGroupId replicaGrpId,
             PeersAndLearners newConfiguration,
-            Consumer<RaftGroupService> updateTableRaftService,
             Function<RaftGroupService, ReplicaListener> createListener,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
             CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut
@@ -766,11 +756,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
 
         return newRaftClientFut
-                .thenApplyAsync(raftClient -> {
-                    // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22218
-                    updateTableRaftService.accept(raftClient);
-                    return createListener.apply(raftClient);
-                }, replicasCreationExecutor)
+                .thenApplyAsync(createListener, replicasCreationExecutor)
                 .thenCompose(replicaListener -> startReplica(replicaGrpId, 
storageIndexTracker, completedFuture(replicaListener)));
     }
 
@@ -1147,6 +1133,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param replicaGrpId Replication group id.
      * @return True if the replica is started.
      */
+    @TestOnly
+    @VisibleForTesting
+    @Deprecated
     public boolean isReplicaStarted(ReplicationGroupId replicaGrpId) {
         CompletableFuture<Replica> replicaFuture = replicas.get(replicaGrpId);
         return replicaFuture != null && isCompletedSuccessfully(replicaFuture);
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index c7999187c5..380234b89d 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -183,7 +183,6 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
         CompletableFuture<Replica> startReplicaFuture = 
replicaManager.startReplica(
                 groupId,
                 newConfiguration,
-                (unused) -> { },
                 (unused) -> replicaListener,
                 new PendingComparableValuesTracker<>(0L),
                 completedFuture(raftGroupService)
diff --git 
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/replicator/ReplicaTestUtils.java
 
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/replicator/ReplicaTestUtils.java
new file mode 100644
index 0000000000..8a063a3270
--- /dev/null
+++ 
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/replicator/ReplicaTestUtils.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.TestOnly;
+
+/** Utilities for working with replicas and replicas manager in tests. */
+public final class ReplicaTestUtils {
+    /**
+     * Returns raft-client if exists.
+     *
+     * @param node Ignite node that hosts the raft-client.
+     * @param tableId Desired table's ID.
+     * @param partId Desired partition's ID.
+     *
+     * @return Optional with raft-client if exists on the node by given 
identifiers.
+     */
+    @TestOnly
+    public static Optional<RaftGroupService> getRaftClient(Ignite node, int 
tableId, int partId) {
+        return getRaftClient(getReplicaManager(node), tableId, partId);
+    }
+
+    /**
+     * Returns raft-client if exists.
+     *
+     * @param replicaManager Ignite node's replica manager with replica that 
should contains a raft client.
+     * @param tableId Desired table's ID.
+     * @param partId Desired partition's ID.
+     *
+     * @return Optional with raft-client if exists on the node by given 
identifiers.
+     */
+    @TestOnly
+    public static Optional<RaftGroupService> getRaftClient(ReplicaManager 
replicaManager, int tableId, int partId) {
+        CompletableFuture<Replica> replicaFut = replicaManager
+                .replica(new TablePartitionId(tableId, partId));
+
+        if  (replicaFut == null) {
+            return Optional.empty();
+        }
+
+        try {
+            return Optional.of(replicaFut.get(15, 
TimeUnit.SECONDS).raftClient());
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Extracts {@link ReplicaManager} from the given {@link Ignite} node.
+     *
+     * @param node The given node with desired replica manager.
+     *
+     * @return Replica manager component from given node.
+     */
+    @TestOnly
+    public static ReplicaManager getReplicaManager(Ignite node) {
+        return IgniteTestUtils.getFieldValue(node, "replicaMgr");
+    }
+
+    /**
+     * Extracts {@link TopologyService} from the given {@link Ignite} node.
+     *
+     * @param node The given node with desired topology service.
+     *
+     * @return Topology service component from given node.
+     */
+    @TestOnly
+    private static TopologyService getTopologyService(Ignite node) {
+        ClusterService clusterService = IgniteTestUtils.getFieldValue(node, 
"clusterSvc");
+        return clusterService.topologyService();
+    }
+
+    /**
+     * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if it cannot be found.
+     *
+     * @param node Ignite node with raft client.
+     * @param tableId Table identifier.
+     * @param partId Partition number.
+     *
+     * @return Leader node of the partition group corresponding to the 
partition
+     */
+    @TestOnly
+    public static ClusterNode leaderAssignment(Ignite node, int tableId, int 
partId) {
+        return leaderAssignment(getReplicaManager(node), 
getTopologyService(node), tableId, partId);
+    }
+
+    /**
+     * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if it cannot be found.
+     *
+     * @param replicaManager Ignite node's replica manager with replica that 
should contains a raft client.
+     * @param topologyService Ignite node's topology service that should find 
and return leader cluster node.
+     * @param tableId Table identifier.
+     * @param partId Partition number.
+     *
+     * @return Leader node of the partition group corresponding to the 
partition
+     */
+    @TestOnly
+    public static ClusterNode leaderAssignment(ReplicaManager replicaManager, 
TopologyService topologyService, int tableId, int partId) {
+        RaftGroupService raftClient = getRaftClient(replicaManager, tableId, 
partId)
+                .orElseThrow(() -> new IgniteInternalException("No such 
partition " + partId + " in table " + tableId));
+
+        if (raftClient.leader() == null) {
+            try {
+                raftClient.refreshLeader().get(15, TimeUnit.SECONDS);
+            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                throw new IgniteInternalException("Couldn't get a leader for 
partition " + partId + " in table " + tableId, e);
+            }
+        }
+
+        return 
topologyService.getByConsistentId(raftClient.leader().consistentId());
+    }
+}
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 5e66356dc3..8ad1bc82d8 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -183,6 +183,7 @@ dependencies {
     integrationTestImplementation 
testFixtures(project(':ignite-failure-handler'))
     integrationTestImplementation testFixtures(project(':ignite-metrics:'))
     integrationTestImplementation testFixtures(project(':ignite-raft'))
+    integrationTestImplementation testFixtures(project(':ignite-replicator'))
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation libs.awaitility
     integrationTestImplementation libs.rocksdb.jni
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 4e479fbe90..4ec569d59d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -30,11 +30,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.IntFunction;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteServer;
 import org.apache.ignite.InitParameters;
@@ -42,8 +48,9 @@ import org.apache.ignite.internal.BaseIgniteRestartTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.TableViewInternal;
@@ -171,9 +178,10 @@ public class ItIgniteInMemoryNodeRestartTest extends 
BaseIgniteRestartTest {
         TableViewInternal table = 
unwrapTableViewInternal(ignite.tables().table(TABLE_NAME));
 
         // Find the leader of the table's partition group.
-        RaftGroupService raftGroupService = 
table.internalTable().tableRaftService().partitionRaftGroupService(0);
-        LeaderWithTerm leaderWithTerm = 
raftGroupService.refreshAndGetLeaderWithTerm().join();
-        String leaderId = leaderWithTerm.leader().consistentId();
+        String leaderId = ignite.replicaManager()
+                .replica(new TablePartitionId(table.tableId(), 0))
+                .thenApply(replica -> 
replica.raftClient().leader().consistentId())
+                .get(15, TimeUnit.SECONDS);
 
         log.info("Leader is {}", leaderId);
 
@@ -196,17 +204,17 @@ public class ItIgniteInMemoryNodeRestartTest extends 
BaseIgniteRestartTest {
         String restartingNodeConsistentId = restartingNode.name();
 
         TableViewInternal restartingTable = 
unwrapTableViewInternal(restartingNode.tables().table(TABLE_NAME));
-        InternalTableImpl internalTable = (InternalTableImpl) 
restartingTable.internalTable();
+        InternalTableImpl restartingInternalTable = (InternalTableImpl) 
restartingTable.internalTable();
 
         // Check that it restarts.
         waitForCondition(
-                () -> isRaftNodeStarted(table, loza) && 
solePartitionAssignmentsContain(restartingNodeConsistentId, internalTable),
+                () -> isRaftNodeStarted(table, loza) && 
solePartitionAssignmentsContain(restartingNode, restartingInternalTable, 0),
                 TimeUnit.SECONDS.toMillis(10)
         );
 
         assertTrue(isRaftNodeStarted(table, loza), "Raft node of the partition 
is not started on " + restartingNodeConsistentId);
         assertTrue(
-                solePartitionAssignmentsContain(restartingNodeConsistentId, 
internalTable),
+                solePartitionAssignmentsContain(restartingNode, 
restartingInternalTable, 0),
                 "Assignments do not contain node " + restartingNodeConsistentId
         );
 
@@ -214,14 +222,29 @@ public class ItIgniteInMemoryNodeRestartTest extends 
BaseIgniteRestartTest {
         checkTableWithData(restartingNode, TABLE_NAME);
     }
 
-    private static boolean solePartitionAssignmentsContain(String 
restartingNodeConsistentId, InternalTableImpl internalTable) {
-        Map<Integer, List<String>> assignments = 
internalTable.tableRaftService().peersAndLearners();
+    private static boolean solePartitionAssignmentsContain(IgniteImpl 
restartingNode, InternalTableImpl table, int partId) {
+        String restartingNodeConsistentId = restartingNode.name();
+
+        TablePartitionId tablePartitionId = new 
TablePartitionId(table.tableId(), partId);
 
-        List<String> partitionAssignments = assignments.get(0);
+        CompletableFuture<Replica> replicaFut = 
restartingNode.replicaManager().replica(tablePartitionId);
 
-        return !assignments.isEmpty()
-                && partitionAssignments != null
-                && partitionAssignments.contains(restartingNodeConsistentId);
+        if (replicaFut == null) {
+            return false;
+        }
+
+        try {
+            RaftGroupService raftClient = replicaFut.get(15, 
TimeUnit.SECONDS).raftClient();
+
+            return Stream.of(raftClient.peers(), raftClient.learners())
+                    .filter(Objects::nonNull)
+                    .flatMap(Collection::stream)
+                    .map(Peer::consistentId)
+                    .collect(Collectors.toSet())
+                    .contains(restartingNodeConsistentId);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            return false;
+        }
     }
 
     private static boolean isRaftNodeStarted(TableViewInternal table, Loza 
loza) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 159702e537..c06c0deff5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -679,7 +679,9 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                         lowWatermark,
                         threadPoolsManager.tableIoExecutor(),
                         rebalanceScheduler,
-                        threadPoolsManager.partitionOperationsExecutor()
+                        threadPoolsManager.partitionOperationsExecutor(),
+                        clockService,
+                        placementDriverManager.placementDriver()
                 )
         );
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 51b1135e20..b1ee929d55 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -170,7 +171,7 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
 
         TableViewInternal table = (TableViewInternal) 
createTable(DEFAULT_TABLE_NAME, 2, 1);
 
-        ClusterNode leader = 
table.internalTable().tableRaftService().leaderAssignment(0);
+        ClusterNode leader = ReplicaTestUtils.leaderAssignment(node0, 
table.tableId(), 0);
 
         boolean isNode0Leader = node0.id().equals(leader.id());
 
@@ -189,7 +190,8 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
 
             assertTrue(IgniteTestUtils.waitForCondition(() -> 
appliedIndexNode0.get() == appliedIndexNode1.get(), 10_000));
 
-            RaftGroupService raftGroupService = 
table.internalTable().tableRaftService().partitionRaftGroupService(0);
+            RaftGroupService raftGroupService = 
ReplicaTestUtils.getRaftClient(node0, table.tableId(), 0)
+                    .orElseThrow(AssertionError::new);
 
             raftGroupService.peers().forEach(peer -> 
assertThat(raftGroupService.snapshot(peer), willCompleteSuccessfully()));
 
@@ -341,7 +343,8 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
     private void transferLeadershipToLocalNode(IgniteImpl ignite) {
         TableViewInternal table = (TableViewInternal) 
ignite.tables().table(DEFAULT_TABLE_NAME);
 
-        RaftGroupService raftGroupService = 
table.internalTable().tableRaftService().partitionRaftGroupService(0);
+        RaftGroupService raftGroupService = 
ReplicaTestUtils.getRaftClient(ignite, table.tableId(), 0)
+                .orElseThrow(AssertionError::new);
 
         List<Peer> peers = raftGroupService.peers();
         assertNotNull(peers);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 28bcdc769e..47ce9163ca 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -784,7 +784,9 @@ public class IgniteImpl implements Ignite {
                 lowWatermark,
                 threadPoolsManager.tableIoExecutor(),
                 rebalanceScheduler,
-                threadPoolsManager.partitionOperationsExecutor()
+                threadPoolsManager.partitionOperationsExecutor(),
+                clockService,
+                placementDriverMgr.placementDriver()
         );
 
         TransactionConfiguration txConfig = 
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
@@ -1692,4 +1694,11 @@ public class IgniteImpl implements Ignite {
     public LowWatermarkImpl lowWatermark() {
         return lowWatermark;
     }
+
+    /** Returns replicas manager. */
+    @TestOnly
+    public ReplicaManager replicaManager() {
+        return replicaMgr;
+    }
+
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 244b81ae94..eb6a1cec8d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import java.util.BitSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -56,7 +55,6 @@ import 
org.apache.ignite.internal.network.SingleClusterNodeResolver;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -76,7 +74,6 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import 
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -253,12 +250,6 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     mock(HybridClock.class),
                     timestampTracker,
                     mock(PlacementDriver.class),
-                    new TableRaftServiceImpl(
-                            "test",
-                            PART_CNT,
-                            Int2ObjectMaps.singleton(0, 
mock(RaftGroupService.class)),
-                            new 
SingleClusterNodeResolver(mock(ClusterNode.class))
-                    ),
                     mock(TransactionInflights.class),
                     3_000,
                     0,
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index ba33cc2bed..483949f30a 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -116,6 +116,7 @@ dependencies {
     
testFixturesImplementation(testFixtures(project(':ignite-failure-handler')))
     testFixturesImplementation(testFixtures(project(':ignite-metrics')))
     testFixturesImplementation(testFixtures(project(':ignite-raft')))
+    testFixturesImplementation(testFixtures(project(':ignite-replicator')))
     testFixturesImplementation libs.jetbrains.annotations
     testFixturesImplementation libs.fastutil.core
     testFixturesImplementation libs.mockito.core
@@ -167,6 +168,7 @@ dependencies {
     
integrationTestImplementation(testFixtures(project(':ignite-failure-handler')))
     integrationTestImplementation(testFixtures(project(':ignite-metrics')))
     integrationTestImplementation(testFixtures(project(':ignite-sql-engine')))
+    integrationTestImplementation(testFixtures(project(':ignite-replicator')))
     integrationTestImplementation libs.fastutil.core
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation libs.calcite.core
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 0049db71ef..12572ef4e2 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -45,7 +45,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -63,6 +68,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -95,13 +101,44 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
     /** Internal table to test. */
     DummyInternalTableImpl internalTbl;
 
+    PlacementDriver placementDriver;
+
+    ClusterNodeResolver clusterNodeResolver;
+
     /**
      * Prepare test environment using DummyInternalTableImpl and Mocked 
storage.
      */
     @BeforeEach
     public void setUp(TestInfo testInfo) {
-        internalTbl = new DummyInternalTableImpl(mock(ReplicaService.class), 
mockStorage, ROW_SCHEMA, txConfiguration,
-                storageUpdateConfiguration);
+        clusterNodeResolver = new ClusterNodeResolver() {
+
+            private final ClusterNode singleNode = 
DummyInternalTableImpl.LOCAL_NODE;
+
+            @Override
+            public @Nullable ClusterNode getByConsistentId(String 
consistentId) {
+                return singleNode.name().equals(consistentId)
+                        ? singleNode
+                        : null;
+            }
+
+            @Override
+            public @Nullable ClusterNode getById(String id) {
+                return singleNode.id().equals(id)
+                        ? singleNode
+                        : null;
+            }
+        };
+
+        placementDriver = new 
TestPlacementDriver(DummyInternalTableImpl.LOCAL_NODE);
+
+        internalTbl = new DummyInternalTableImpl(
+                mock(ReplicaService.class),
+                placementDriver,
+                mockStorage,
+                ROW_SCHEMA,
+                txConfiguration,
+                storageUpdateConfiguration
+        );
     }
 
     /**
@@ -515,6 +552,24 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
         );
     }
 
+    /**
+     * Resolves primary replica node for given replication group.
+     *
+     * @param replicationGroupId Desired replication group ID.
+     * @return Primary replica {@link ClusterNode} for the group.
+     */
+    protected ClusterNode getPrimaryReplica(ReplicationGroupId 
replicationGroupId) {
+        return placementDriver.awaitPrimaryReplica(
+                        replicationGroupId,
+                        DummyInternalTableImpl.CLOCK.now(),
+                        DummyInternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                        TimeUnit.SECONDS
+                )
+                .thenApply(ReplicaMeta::getLeaseholder)
+                .thenApply(clusterNodeResolver::getByConsistentId)
+                .join();
+    }
+
     /**
      * Either read-write or read-only publisher producer.
      *
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 6ca4986d8c..20553e25a8 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -115,12 +117,23 @@ public class ItInternalTableReadOnlyOperationsTest 
extends IgniteAbstractTest {
     @Mock
     private BinaryRowEx someRow;
 
+    private PlacementDriver placementDriver;
+
     /**
      * Prepare test environment using DummyInternalTableImpl and Mocked 
storage.
      */
     @BeforeEach
     public void setUp(TestInfo testInfo) {
-        internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, 
SCHEMA, txConfiguration, storageUpdateConfiguration);
+        placementDriver = new 
TestPlacementDriver(DummyInternalTableImpl.LOCAL_NODE);
+
+        internalTbl = new DummyInternalTableImpl(
+                replicaService,
+                placementDriver,
+                mockStorage,
+                SCHEMA,
+                txConfiguration,
+                storageUpdateConfiguration
+        );
 
         lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
         lenient().when(readOnlyTx.readTimestamp()).thenReturn(CLOCK.now());
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 99cd51a6ec..5fba2bd011 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -21,12 +21,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.RollbackTxOnErrorPublisher;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -76,11 +74,14 @@ public class ItInternalTableReadWriteScanTest extends 
ItAbstractInternalTableSca
         InternalTransaction tx = 
internalTbl.txManager().begin(HYBRID_TIMESTAMP_TRACKER);
 
         TablePartitionId tblPartId = new 
TablePartitionId(internalTbl.tableId(), ((TablePartitionId) 
internalTbl.groupId()).partitionId());
-        RaftGroupService raftSvc = 
internalTbl.tableRaftService().partitionRaftGroupService(tblPartId.partitionId());
-        long term = 
IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term();
+
+        long term = 1L;
 
         tx.assignCommitPartition(tblPartId);
-        tx.enlist(tblPartId, new 
IgniteBiTuple<>(internalTbl.tableRaftService().leaderAssignment(tblPartId.partitionId()),
 term));
+
+        ClusterNode primaryReplicaNode = getPrimaryReplica(tblPartId);
+
+        tx.enlist(tblPartId, new IgniteBiTuple<>(primaryReplicaNode, term));
 
         return tx;
     }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 0ada927361..aae2ca7d88 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -245,7 +245,6 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                         replicaManager.startReplica(
                                 tablePartitionId,
                                 newConfiguration,
-                                (unused) -> { },
                                 (unused) -> listener,
                                 new PendingComparableValuesTracker<>(0L),
                                 
completedFuture(mock(TopologyAwareRaftGroupService.class))
@@ -363,7 +362,6 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                     replicaManager.startReplica(
                             tablePartitionId,
                             newConfiguration,
-                            (unused) -> { },
                             (unused) -> listener,
                             new PendingComparableValuesTracker<>(0L),
                             
completedFuture(mock(TopologyAwareRaftGroupService.class))
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index fcc7294998..c5abadb475 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -156,19 +156,20 @@ import 
org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMe
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
@@ -188,7 +189,6 @@ import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.Persis
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineExtensionConfigurationSchema;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
-import org.apache.ignite.internal.table.TableRaftService;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
@@ -359,6 +359,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         assertTrue(waitForCondition(() -> getPartitionClusterNodes(node, 
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
 
+        electPrimaryReplica(node);
+
         alterZone(node, ZONE_NAME, 2);
 
         waitPartitionAssignmentsSyncedToExpected(0, 2);
@@ -378,6 +380,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         assertTrue(waitForCondition(() -> getPartitionClusterNodes(node, 
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
 
+        electPrimaryReplica(node);
+
         alterZone(node, ZONE_NAME, 2);
 
         waitPartitionAssignmentsSyncedToExpected(TABLE_NAME, 0, 2);
@@ -399,6 +403,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         assertTrue(waitForCondition(() -> getPartitionClusterNodes(node, 
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
 
+        electPrimaryReplica(node);
+
         alterZone(node, ZONE_NAME, 2);
         alterZone(node, ZONE_NAME, 3);
 
@@ -417,6 +423,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         assertTrue(waitForCondition(() -> getPartitionClusterNodes(node, 
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
 
+        electPrimaryReplica(node);
+
         alterZone(node, ZONE_NAME, 2);
         alterZone(node, ZONE_NAME, 3);
         alterZone(node, ZONE_NAME, 2);
@@ -442,21 +450,31 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         waitPartitionAssignmentsSyncedToExpected(0, 2);
 
+        electPrimaryReplica(node0);
+
         Set<String> partitionNodesConsistentIds = 
getPartitionClusterNodes(node0, 0).stream()
                 .map(Assignment::consistentId)
                 .collect(toSet());
 
         Node newNode = nodes.stream().filter(n -> 
!partitionNodesConsistentIds.contains(n.name)).findFirst().orElseThrow();
 
-        Node leaderNode = 
findNodeByConsistentId(table.tableRaftService().leaderAssignment(0).name());
+        ClusterNode leaderClusterNode = ReplicaTestUtils.leaderAssignment(
+                node1.replicaManager,
+                node1.clusterService.topologyService(),
+                table.tableId(),
+                0
+        );
+
+        Node leaderNode = findNodeByConsistentId(leaderClusterNode.name());
 
         String nonLeaderNodeConsistentId = partitionNodesConsistentIds.stream()
                 .filter(n -> !n.equals(leaderNode.name))
                 .findFirst()
                 .orElseThrow();
 
-        TableViewInternal nonLeaderTable =
-                (TableViewInternal) 
findNodeByConsistentId(nonLeaderNodeConsistentId).tableManager.table(TABLE_NAME);
+        Node nonLeaderNode = findNodeByConsistentId(nonLeaderNodeConsistentId);
+
+        TableViewInternal nonLeaderTable = (TableViewInternal) 
nonLeaderNode.tableManager.table(TABLE_NAME);
 
         var countDownLatch = new CountDownLatch(1);
 
@@ -481,10 +499,10 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         assertTrue(countDownLatch.await(10, SECONDS));
 
-        TableRaftService tableRaftService = 
nonLeaderTable.internalTable().tableRaftService();
-
         assertThat(
-                
tableRaftService.partitionRaftGroupService(0).transferLeadership(new 
Peer(nonLeaderNodeConsistentId)),
+                ReplicaTestUtils.getRaftClient(nonLeaderNode.replicaManager, 
nonLeaderTable.tableId(), 0)
+                        .map(raftClient -> raftClient.transferLeadership(new 
Peer(nonLeaderNodeConsistentId)))
+                        .orElse(null),
                 willCompleteSuccessfully()
         );
 
@@ -509,6 +527,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         waitPartitionAssignmentsSyncedToExpected(0, 1);
 
+        electPrimaryReplica(node);
+
         JraftServerImpl raftServer = (JraftServerImpl) nodes.stream()
                 .filter(n -> n.raftManager.localNodes().stream().anyMatch(grp 
-> grp.toString().contains("_part_")))
                 .findFirst()
@@ -551,6 +571,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         waitPartitionAssignmentsSyncedToExpected(0, 2);
 
+        electPrimaryReplica(node);
+
         Set<Assignment> assignmentsAfterChangeReplicas = 
getPartitionClusterNodes(node, 0);
 
         Set<Assignment> evictedAssignments = 
getEvictedAssignments(assignmentsBeforeChangeReplicas, 
assignmentsAfterChangeReplicas);
@@ -653,6 +675,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         waitPartitionAssignmentsSyncedToExpected(0, 1);
 
+        electPrimaryReplica(node);
+
         alterZone(node, ZONE_NAME, 3);
 
         waitPartitionAssignmentsSyncedToExpected(0, 3);
@@ -676,6 +700,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         waitPartitionAssignmentsSyncedToExpected(0, 1);
 
+        electPrimaryReplica(node);
+
         Set<Assignment> assignmentsBeforeRebalance = 
getPartitionClusterNodes(node, 0);
 
         String newNodeNameForAssignment = nodes.stream()
@@ -742,6 +768,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
         waitPartitionAssignmentsSyncedToExpected(0, 1);
 
+        electPrimaryReplica(node);
+
         var assignmentsBeforeRebalance = getPartitionClusterNodes(node, 0);
         String nodeNameAssignedBeforeRebalance = 
assignmentsBeforeRebalance.stream()
                 .findFirst()
@@ -813,22 +841,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     }
 
     private static boolean isNodeUpdatesPeersOnGroupService(Node node, 
Set<Peer> desiredPeers) {
-        // TODO: will be replaced with replica usage in 
https://issues.apache.org/jira/browse/IGNITE-22218
-        TableRaftService tblRaftSvc = node.tableManager.startedTables()
-                                .get(getTableId(node, TABLE_NAME))
-                                .internalTable()
-                .tableRaftService();
-        RaftGroupService groupService;
-        try {
-            groupService = tblRaftSvc.partitionRaftGroupService(0);
-        } catch (IgniteInternalException e) {
-            return false;
-        }
-        List<Peer> peersList = groupService.peers();
-        boolean isUpdated = peersList.stream()
-                                .collect(toSet())
-                .equals(desiredPeers);
-        return isUpdated;
+        return ReplicaTestUtils.getRaftClient(node.replicaManager, 
getTableId(node, TABLE_NAME), 0)
+                .map(raftClient -> 
raftClient.peers().stream().collect(toSet()).equals(desiredPeers))
+                .orElse(false);
     }
 
     private void clearSpyInvocations() {
@@ -899,28 +914,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
         Node anyNode = nodes.get(0);
         Set<Assignment> assignments = getPartitionClusterNodes(anyNode, 
tableName, replicasNum);
         assertTrue(waitForCondition(
-                () -> {
-                    try {
-                        return nodes.stream()
-                                .filter(n -> isNodeInAssignments(n, 
assignments))
-                                .allMatch(n -> {
-                                    // TODO: will be replaced with replica 
usage in https://issues.apache.org/jira/browse/IGNITE-22218
-                                    TableRaftService trs = n.tableManager
-                                            .cachedTable(getTableId(n, 
tableName))
-                                            .internalTable()
-                                            .tableRaftService();
-
-                                    try {
-                                        return 
trs.partitionRaftGroupService(partNum) != null;
-                                    } catch (IgniteInternalException e) {
-                                        return false;
-                                    }
-                                });
-                    } catch (IgniteInternalException e) {
-                        // Raft group service not found.
-                        return false;
-                    }
-                },
+                () -> nodes.stream()
+                        .filter(n -> isNodeInAssignments(n, assignments))
+                        .allMatch(n -> 
ReplicaTestUtils.getRaftClient(n.replicaManager, getTableId(n, tableName), 
partNum).isPresent()),
                 (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
         ));
     }
@@ -947,6 +943,22 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
         return IntStream.range(0, nodes.size()).filter(i -> 
getNode(i).name.equals(consistentId)).findFirst().orElseThrow();
     }
 
+    private void electPrimaryReplica(Node node) {
+        Set<Assignment> assignments = getPartitionClusterNodes(node, 0);
+
+        String leaseholderConsistentId = 
assignments.stream().findFirst().get().consistentId();
+
+        ClusterNode leaseholder = nodes.stream()
+                .filter(n -> 
n.clusterService.topologyService().localMember().name().equals(leaseholderConsistentId))
+                .findFirst()
+                .get()
+                .clusterService
+                .topologyService()
+                .localMember();
+
+        node.placementDriver.setPrimaryReplicaSupplier(() -> new 
TestReplicaMetaImpl(leaseholder.name(), leaseholder.id()));
+    }
+
     private static Set<Assignment> getPartitionClusterNodes(Node node, int 
partNum) {
         return getPartitionClusterNodes(node, TABLE_NAME, partNum);
     }
@@ -1360,7 +1372,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                             lowWatermark,
                             threadPoolsManager.tableIoExecutor(),
                             rebalanceScheduler,
-                            threadPoolsManager.partitionOperationsExecutor()
+                            threadPoolsManager.partitionOperationsExecutor(),
+                            clockService,
+                            placementDriver
                     )
             ) {
                 @Override
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 5302727ff0..2cab2d6cc5 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -96,7 +96,6 @@ import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import 
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -295,7 +294,6 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 new HybridClockImpl(),
                 observableTimestampTracker,
                 new TestPlacementDriver(clusterNode),
-                new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new 
SingleClusterNodeResolver(clusterNode)),
                 transactionInflights,
                 3_000,
                 0,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 51189f5c6f..ee381cd859 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -449,13 +449,6 @@ public interface InternalTable extends ManuallyCloseable {
      */
     TxStateTableStorage txStateStorage();
 
-    /**
-     * Raft service for this table.
-     *
-     * @return Table raft service.
-     */
-    TableRaftService tableRaftService();
-
     // TODO: IGNITE-14488. Add invoke() methods.
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index e8f874d37b..aae27fa43a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -43,7 +43,6 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
 import org.apache.ignite.internal.tx.LockManager;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
@@ -210,11 +209,6 @@ public class TableImpl implements TableViewInternal {
         return tbl.partition(keyRow);
     }
 
-    @Override
-    public ClusterNode leaderAssignment(int partition) {
-        return tbl.tableRaftService().leaderAssignment(partition);
-    }
-
     /** Returns a supplier of index storage wrapper factories for given 
partition. */
     public TableIndexStoragesSupplier indexStorageAdapters(int partId) {
         return () -> {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRaftService.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRaftService.java
deleted file mode 100644
index 8915cfe24e..0000000000
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRaftService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table;
-
-import org.apache.ignite.internal.close.ManuallyCloseable;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.network.ClusterNode;
-
-/**
- * Internal facade that provides methods for table's raft operations.
- */
-public interface TableRaftService extends ManuallyCloseable {
-
-    /**
-     * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if it cannot be found.
-     *
-     * @param partition partition number
-     * @return leader node of the partition group corresponding to the 
partition
-     */
-    ClusterNode leaderAssignment(int partition);
-
-    /**
-     * Returns raft group client for corresponding partition.
-     *
-     * @param partition partition number
-     * @return raft group client for corresponding partition
-     * @throws IgniteInternalException if partition can't be found.
-     */
-    RaftGroupService partitionRaftGroupService(int partition);
-
-    /**
-     * Closes the service.
-     */
-    @Override
-    void close();
-}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
index 56878a266e..ceea5cadd1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
@@ -22,7 +22,6 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
@@ -77,15 +76,6 @@ public interface TableViewInternal extends Table {
      */
     <K> int partition(K key, Mapper<K> keyMapper);
 
-    /**
-     * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if
-     * it cannot be found.
-     *
-     * @param partition Partition number.
-     * @return Leader node of the partition group corresponding to the 
partition.
-     */
-    ClusterNode leaderAssignment(int partition);
-
     /**
      * Registers the index with given id in a table.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 656a6381a5..613f457ca3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -42,6 +42,7 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -62,7 +63,6 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -194,7 +194,6 @@ import 
org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
-import 
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
 import 
org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -1038,11 +1037,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
 
-        // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22218
-        Consumer<RaftGroupService> updateTableRaftService = raftClient -> 
internalTbl
-                .tableRaftService()
-                .updateInternalTableRaftGroupService(partId, raftClient);
-
         CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
                 ? partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
                         replicaGrpId,
@@ -1120,7 +1114,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                 raftGroupListener,
                                 mvTableStorage.isVolatile(),
                                 snapshotStorageFactory,
-                                updateTableRaftService,
                                 createListener,
                                 storageIndexTracker,
                                 replicaGrpId,
@@ -1243,6 +1236,50 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return assignments.stream().anyMatch(isLocalNodeAssignment);
     }
 
+    /**
+     * Checks that the local node is primary or not.
+     * <br>
+     * Internally we use there {@link PlacementDriver#getPrimaryReplica} with 
a penultimate
+     * safe time value, because metastore is waiting for pending or stable 
assignments events handling over and only then metastore will
+     * increment the safe time. On the other hand placement driver internally 
is waiting the metastore for given safe time plus
+     * {@link ClockService#maxClockSkewMillis}. So, if given time is just 
{@link ClockService#now}, then there is a dead lock: metastore
+     * is waiting until assignments handling is over, but internally placement 
driver is waiting for a non-applied safe time.
+     * <br>
+     * To solve this issue we pass to {@link 
PlacementDriver#getPrimaryReplica} current time minus the skew, so placement 
driver could
+     * successfully get primary replica for the time stamp before the handling 
has began. Also there a corner case for tests that are using
+     * {@code WatchListenerInhibitor#metastorageEventsInhibitor} and it leads 
to current time equals {@link HybridTimestamp#MIN_VALUE} and
+     * the skew's subtraction will lead to {@link IllegalArgumentException} 
from {@link HybridTimestamp}. Then, if we got the minimal
+     * possible timestamp, then we also couldn't have any primary replica, 
then return {@code false}.
+     *
+     * @param replicationGroupId Replication group ID for that we check is the 
local node a primary.
+     * @return {@code true} is the local node is primary and {@code false} 
otherwise.
+     */
+    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId 
replicationGroupId) {
+        HybridTimestamp currentSafeTime = 
metaStorageMgr.clusterTime().currentSafeTime();
+
+        if (HybridTimestamp.MIN_VALUE.equals(currentSafeTime)) {
+            return falseCompletedFuture();
+        }
+
+        long skewMs = clockService.maxClockSkewMillis();
+
+        try {
+            HybridTimestamp previousMetastoreSafeTime = 
currentSafeTime.subtractPhysicalTime(skewMs);
+
+            return 
executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId, 
previousMetastoreSafeTime)
+                    .thenApply(replicaMeta -> replicaMeta != null
+                            && replicaMeta.getLeaseholderId() != null
+                            && 
replicaMeta.getLeaseholderId().equals(localNode().id()));
+        } catch (IllegalArgumentException e) {
+            long currentSafeTimeMs = currentSafeTime.longValue();
+
+            throw new AssertionError("Got a negative time [currentSafeTime=" + 
currentSafeTime
+                    + ", currentSafeTimeMs=" + currentSafeTimeMs
+                    + ", skewMs=" + skewMs
+                    + ", internal=" + (currentSafeTimeMs + ((-skewMs) << 
LOGICAL_TIME_BITS_SIZE)) + "]", e);
+        }
+    }
+
     private PartitionDataStorage partitionDataStorage(MvPartitionStorage 
partitionStorage, InternalTable internalTbl, int partId) {
         return new SnapshotAwarePartitionDataStorage(
                 partitionStorage,
@@ -1369,13 +1406,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         int partitions = zoneDescriptor.partitions();
 
-        TableRaftServiceImpl tableRaftService = new TableRaftServiceImpl(
-                tableName,
-                partitions,
-                new Int2ObjectOpenHashMap<>(partitions),
-                topologyService
-        );
-
         InternalTableImpl internalTable = new InternalTableImpl(
                 tableName,
                 tableDescriptor.id(),
@@ -1388,7 +1418,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 clock,
                 observableTimestampTracker,
                 executorInclinedPlacementDriver,
-                tableRaftService,
                 transactionInflights,
                 implicitTransactionTimeout,
                 attemptsObtainLock,
@@ -1965,7 +1994,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                 return nullCompletedFuture();
                             }
 
-                            return changePeersOnRebalance(table, replicaGrpId, 
pendingAssignments.nodes(), revision);
+                            return changePeersOnRebalance(replicaGrpId, 
pendingAssignments.nodes(), revision);
                         });
                     } finally {
                         busyLock.leaveBusy();
@@ -2059,7 +2088,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
 
         return localServicesStartFuture
-                .thenComposeAsync(v -> inBusyLock(busyLock, () -> 
isLocalNodeLeaseholder(replicaGrpId)), ioExecutor)
+                .thenComposeAsync(v -> inBusyLock(busyLock, () -> 
isLocalNodeIsPrimary(replicaGrpId)), ioExecutor)
                 .thenAcceptAsync(isLeaseholder -> inBusyLock(busyLock, () -> {
                     boolean isLocalNodeInStableOrPending = 
isNodeInReducedStableOrPendingAssignments(
                             replicaGrpId,
@@ -2082,10 +2111,9 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             ? pendingAssignmentsNodes
                             : union(pendingAssignmentsNodes, 
stableAssignments.nodes());
 
-                    tbl.internalTable()
-                            .tableRaftService()
-                            .partitionRaftGroupService(partitionId)
-                            
.updateConfiguration(fromAssignments(newAssignments));
+                    replicaMgr.replica(replicaGrpId)
+                            .thenApply(Replica::raftClient)
+                            .thenAccept(raftClient -> 
raftClient.updateConfiguration(fromAssignments(newAssignments)));
                 }), ioExecutor);
     }
 
@@ -2119,56 +2147,60 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     private CompletableFuture<Void> changePeersOnRebalance(
-            // TODO: remove excessive argument (used to get raft-client) 
https://issues.apache.org/jira/browse/IGNITE-22218
-            TableImpl table,
             TablePartitionId replicaGrpId,
             Set<Assignment> pendingAssignments,
             long revision
     ) {
-        int partId = replicaGrpId.partitionId();
-
-        RaftGroupService partGrpSvc = 
table.internalTable().tableRaftService().partitionRaftGroupService(partId);
+        return replicaMgr.replica(replicaGrpId)
+                .thenApply(Replica::raftClient)
+                .thenCompose(raftClient -> 
raftClient.refreshAndGetLeaderWithTerm()
+                        .exceptionally(throwable -> {
+                            throwable = unwrapCause(throwable);
 
-        return partGrpSvc.refreshAndGetLeaderWithTerm()
-                .exceptionally(throwable -> {
-                    throwable = unwrapCause(throwable);
+                            if (throwable instanceof TimeoutException) {
+                                LOG.info(
+                                        "Node couldn't get the leader within 
timeout so the changing peers is skipped [grp={}].",
+                                        replicaGrpId
+                                );
 
-                    if (throwable instanceof TimeoutException) {
-                        LOG.info("Node couldn't get the leader within timeout 
so the changing peers is skipped [grp={}].", replicaGrpId);
+                                return LeaderWithTerm.NO_LEADER;
+                            }
 
-                        return LeaderWithTerm.NO_LEADER;
-                    }
+                            throw new IgniteInternalException(
+                                    INTERNAL_ERR,
+                                    "Failed to get a leader for the RAFT 
replication group [get=" + replicaGrpId + "].",
+                                    throwable
+                            );
+                        })
+                        .thenCompose(leaderWithTerm -> {
+                            if (leaderWithTerm.isEmpty() || 
!isLocalPeer(leaderWithTerm.leader())) {
+                                return nullCompletedFuture();
+                            }
 
-                    throw new IgniteInternalException(
-                            INTERNAL_ERR,
-                            "Failed to get a leader for the RAFT replication 
group [get=" + replicaGrpId + "].",
-                            throwable
-                    );
-                })
-                .thenCompose(leaderWithTerm -> {
-                    if (leaderWithTerm.isEmpty() || 
!isLocalPeer(leaderWithTerm.leader())) {
-                        return nullCompletedFuture();
-                    }
+                            // run update of raft configuration if this node 
is a leader
+                            LOG.info("Current node={} is the leader of 
partition raft group={}. "
+                                            + "Initiate rebalance process for 
partition={}, table={}",
+                                    leaderWithTerm.leader(),
+                                    replicaGrpId,
+                                    replicaGrpId.partitionId(),
+                                    tables.get(replicaGrpId.tableId()).name()
+                            );
 
-                    // run update of raft configuration if this node is a 
leader
-                    LOG.info("Current node={} is the leader of partition raft 
group={}. "
-                                    + "Initiate rebalance process for 
partition={}, table={}",
-                            leaderWithTerm.leader(), replicaGrpId, partId, 
tables.get(replicaGrpId.tableId()).name());
-
-                    return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
-                            .thenCompose(latestPendingAssignmentsEntry -> {
-                                // Do not change peers of the raft group if 
this is a stale event.
-                                // Note that we start raft node before for the 
sake of the consistency in a
-                                // starting and stopping raft nodes.
-                                if (revision < 
latestPendingAssignmentsEntry.revision()) {
-                                    return nullCompletedFuture();
-                                }
+                            return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
+                                    .thenCompose(latestPendingAssignmentsEntry 
-> {
+                                        // Do not change peers of the raft 
group if this is a stale event.
+                                        // Note that we start raft node before 
for the sake of the consistency in a
+                                        // starting and stopping raft nodes.
+                                        if (revision < 
latestPendingAssignmentsEntry.revision()) {
+                                            return nullCompletedFuture();
+                                        }
 
-                                PeersAndLearners newConfiguration = 
fromAssignments(pendingAssignments);
+                                        PeersAndLearners newConfiguration = 
fromAssignments(pendingAssignments);
 
-                                return 
partGrpSvc.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term());
-                            });
-                });
+                                        return 
raftClient.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term());
+                                    });
+                        })
+                );
     }
 
     private SnapshotStorageFactory createSnapshotStorageFactory(
@@ -2397,23 +2429,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }, ioExecutor).thenCompose(identity());
     }
 
-    private CompletableFuture<Boolean> 
isLocalNodeLeaseholder(ReplicationGroupId replicationGroupId) {
-        HybridTimestamp previousMetastoreSafeTime = 
metaStorageMgr.clusterTime()
-                .currentSafeTime()
-                .addPhysicalTime(-clockService.maxClockSkewMillis());
-
-        return 
executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId, 
previousMetastoreSafeTime)
-                .thenApply(replicaMeta -> replicaMeta != null
-                    && replicaMeta.getLeaseholderId() != null
-                    && 
replicaMeta.getLeaseholderId().equals(localNode().name()));
-    }
-
     private CompletableFuture<Void> updatePartitionClients(
             TablePartitionId tablePartitionId,
-            Set<Assignment> stableAssignments,
-            long revision
+            Set<Assignment> stableAssignments
     ) {
-        return 
isLocalNodeLeaseholder(tablePartitionId).thenCompose(isLeaseholder -> 
inBusyLock(busyLock, () -> {
+        return 
isLocalNodeIsPrimary(tablePartitionId).thenCompose(isLeaseholder -> 
inBusyLock(busyLock, () -> {
             boolean isLocalInStable = 
isLocalNodeInAssignments(stableAssignments);
 
             if (!isLocalInStable && !isLeaseholder) {
@@ -2421,16 +2441,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             }
 
             assert replicaMgr.isReplicaStarted(tablePartitionId)
-                    : "The local node is outside of the replication group 
[inStable=" + isLocalInStable
+                    : "The local node is outside of the replication group 
[stable=" + stableAssignments
                             + ", isLeaseholder=" + isLeaseholder + "].";
 
             // Update raft client peers and learners according to the actual 
assignments.
-            return tablesById(revision).thenAccept(t -> 
t.get(tablePartitionId.tableId())
-                    .internalTable()
-                    .tableRaftService()
-                    .partitionRaftGroupService(tablePartitionId.partitionId())
-                    .updateConfiguration(fromAssignments(stableAssignments))
-            );
+            return replicaMgr.replica(tablePartitionId)
+                    .thenApply(Replica::raftClient)
+                    .thenAccept(raftClient -> 
raftClient.updateConfiguration(fromAssignments(stableAssignments)));
         }));
     }
 
@@ -2444,7 +2461,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         CompletableFuture<Void> clientUpdateFuture = isRecovery
                 // Updating clients is not needed on recovery.
                 ? nullCompletedFuture()
-                : updatePartitionClients(tablePartitionId, stableAssignments, 
revision);
+                : updatePartitionClients(tablePartitionId, stableAssignments);
 
         boolean shouldStopLocalServices = (pendingAssignments.force()
                         ? pendingAssignments.nodes().stream()
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 715ec99e94..c46521c8f5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -203,9 +203,6 @@ public class InternalTableImpl implements InternalTable {
     /** Map update guarded by {@link #updatePartitionMapsMux}. */
     private volatile Int2ObjectMap<PendingComparableValuesTracker<Long, Void>> 
storageIndexTrackerByPartitionId = emptyMap();
 
-    /** Table raft service. */
-    private final TableRaftServiceImpl tableRaftService;
-
     /** Implicit transaction timeout. */
     private final long implicitTransactionTimeout;
 
@@ -225,7 +222,6 @@ public class InternalTableImpl implements InternalTable {
      * @param replicaSvc Replica service.
      * @param clock A hybrid logical clock.
      * @param placementDriver Placement driver.
-     * @param tableRaftService Table raft service.
      * @param transactionInflights Transaction inflights.
      * @param implicitTransactionTimeout Implicit transaction timeout.
      * @param attemptsObtainLock Attempts to take lock.
@@ -242,7 +238,6 @@ public class InternalTableImpl implements InternalTable {
             HybridClock clock,
             HybridTimestampTracker observableTimestampTracker,
             PlacementDriver placementDriver,
-            TableRaftServiceImpl tableRaftService,
             TransactionInflights transactionInflights,
             long implicitTransactionTimeout,
             int attemptsObtainLock,
@@ -260,7 +255,6 @@ public class InternalTableImpl implements InternalTable {
         this.clock = clock;
         this.observableTimestampTracker = observableTimestampTracker;
         this.placementDriver = placementDriver;
-        this.tableRaftService = tableRaftService;
         this.transactionInflights = transactionInflights;
         this.implicitTransactionTimeout = implicitTransactionTimeout;
         this.attemptsObtainLock = attemptsObtainLock;
@@ -294,16 +288,9 @@ public class InternalTableImpl implements InternalTable {
 
     @Override
     public void name(String newName) {
-        tableRaftService.name(newName);
-
         this.tableName = newName;
     }
 
-    @Override
-    public TableRaftServiceImpl tableRaftService() {
-        return tableRaftService;
-    }
-
     /**
      * Enlists a single row into a transaction.
      *
@@ -2204,7 +2191,7 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public void close() {
-        tableRaftService.close();
+        // No-op
     }
 
     // TODO: IGNITE-17963 Use smarter logic for recipient node evaluation.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/TableRaftServiceImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/TableRaftServiceImpl.java
deleted file mode 100644
index ac0ba7d8c0..0000000000
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/TableRaftServiceImpl.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.storage;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.table.TableRaftService;
-import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.TestOnly;
-
-/**
- * Table's raft client storage.
- */
-public class TableRaftServiceImpl implements TableRaftService {
-
-    /** Mutex for the partition maps update. */
-    private final Object updatePartitionMapsMux = new Object();
-
-    /** Map update guarded by {@link #updatePartitionMapsMux}. */
-    private volatile Int2ObjectMap<RaftGroupService> 
raftGroupServiceByPartitionId;
-
-    /** Resolver that resolves a node consistent ID to cluster node. */
-    private final ClusterNodeResolver clusterNodeResolver;
-
-    /** Partitions. */
-    private final int partitions;
-
-    /** Table name. */
-    private volatile String tableName;
-
-    /**
-     * Constructor.
-     *
-     * @param tableName Table name.
-     * @param partitions Partitions.
-     * @param partMap Map partition id to raft group.
-     * @param clusterNodeResolver Cluster node resolver.
-     */
-    public TableRaftServiceImpl(
-            String tableName,
-            int partitions,
-            Int2ObjectMap<RaftGroupService> partMap,
-            ClusterNodeResolver clusterNodeResolver
-    ) {
-        this.tableName = tableName;
-        this.partitions = partitions;
-        this.raftGroupServiceByPartitionId = partMap;
-        this.clusterNodeResolver = clusterNodeResolver;
-    }
-
-    @Override
-    public ClusterNode leaderAssignment(int partition) {
-        awaitLeaderInitialization();
-
-        RaftGroupService raftGroupService = 
raftGroupServiceByPartitionId.get(partition);
-        if (raftGroupService == null) {
-            throw new IgniteInternalException("No such partition " + partition 
+ " in table " + tableName);
-        }
-
-        return 
clusterNodeResolver.getByConsistentId(raftGroupService.leader().consistentId());
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public RaftGroupService partitionRaftGroupService(int partition) {
-        RaftGroupService raftGroupService = 
raftGroupServiceByPartitionId.get(partition);
-        if (raftGroupService == null) {
-            throw new IgniteInternalException("No such partition " + partition 
+ " in table " + tableName);
-        }
-
-        return raftGroupService;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void close() {
-        for (RaftGroupService srv : raftGroupServiceByPartitionId.values()) {
-            srv.shutdown();
-        }
-    }
-
-    public void name(String newName) {
-        this.tableName = newName;
-    }
-
-    /**
-     * Updates internal table raft group service for given partition.
-     *
-     * @param p Partition.
-     * @param raftGrpSvc Raft group service.
-     */
-    public void updateInternalTableRaftGroupService(int p, RaftGroupService 
raftGrpSvc) {
-        RaftGroupService oldSrvc;
-
-        synchronized (updatePartitionMapsMux) {
-            Int2ObjectMap<RaftGroupService> newPartitionMap = new 
Int2ObjectOpenHashMap<>(partitions);
-
-            newPartitionMap.putAll(raftGroupServiceByPartitionId);
-
-            oldSrvc = newPartitionMap.put(p, raftGrpSvc);
-
-            raftGroupServiceByPartitionId = newPartitionMap;
-        }
-
-        if (oldSrvc != null && oldSrvc != raftGrpSvc) {
-            oldSrvc.shutdown();
-        }
-    }
-
-    /**
-     * Returns map of partition -> list of peers and learners of that 
partition.
-     */
-    @TestOnly
-    public Map<Integer, List<String>> peersAndLearners() {
-        awaitLeaderInitialization();
-
-        return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream()
-                .collect(Collectors.toMap(Entry::getIntKey, e -> {
-                    RaftGroupService service = e.getValue();
-                    return Stream.of(service.peers(), service.learners())
-                            .filter(Objects::nonNull)
-                            .flatMap(Collection::stream)
-                            .map(Peer::consistentId)
-                            .collect(Collectors.toList());
-                }));
-    }
-
-    private void awaitLeaderInitialization() {
-        List<CompletableFuture<Void>> futs = new ArrayList<>();
-
-        for (RaftGroupService raftSvc : 
raftGroupServiceByPartitionId.values()) {
-            if (raftSvc.leader() == null) {
-                futs.add(raftSvc.refreshLeader());
-            }
-        }
-
-        CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new)).join();
-    }
-}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 89fad7875a..b3a9d13c8c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -286,7 +286,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         when(topologyService.localMember()).thenReturn(node);
         when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(emptySetCompletedFuture());
 
-        when(replicaMgr.startReplica(any(), any(), any(), any(), 
any(PendingComparableValuesTracker.class), any()))
+        when(replicaMgr.startReplica(any(), any(), any(), 
any(PendingComparableValuesTracker.class), any()))
                 .thenReturn(nullCompletedFuture());
         when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
         when(replicaMgr.weakStartReplica(any(), any(), 
any())).thenReturn(trueCompletedFuture());
@@ -364,7 +364,9 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                         lowWatermark,
                         ForkJoinPool.commonPool(),
                         mock(ScheduledExecutorService.class),
-                        partitionOperationsExecutor
+                        partitionOperationsExecutor,
+                        clockService,
+                        placementDriver
                 )
         ) {
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index bd5238a2fd..0f184e46f8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -294,7 +294,7 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(emptySetCompletedFuture());
 
-        when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), 
any(), any(), any(), any()))
+        when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), 
any(), any(), any()))
                 .thenReturn(nullCompletedFuture());
         when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
         when(replicaMgr.weakStartReplica(any(), any(), any())).thenAnswer(inv 
-> {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 02f2146d46..d9d53b08fc 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -148,7 +148,6 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
             @Mock MvTableStorage tableStorage,
             @Mock TxStateTableStorage txStateTableStorage,
             @Mock TxStateStorage txStateStorage,
-            @Mock TableRaftServiceImpl tableRaftService,
             @Mock TransactionStateResolver transactionStateResolver,
             @Mock StorageUpdateHandler storageUpdateHandler,
             @Mock ValidationSchemasSource validationSchemasSource,
@@ -202,7 +201,6 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
                 clock,
                 new HybridTimestampTracker(),
                 placementDriver,
-                tableRaftService,
                 new TransactionInflights(placementDriver, clockService),
                 0,
                 0,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 244687570a..1fef021d82 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -72,7 +71,6 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 mock(HybridClock.class),
                 new HybridTimestampTracker(),
                 mock(PlacementDriver.class),
-                new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
                 mock(TransactionInflights.class),
                 3_000,
                 0,
@@ -123,7 +121,6 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 mock(HybridClock.class),
                 new HybridTimestampTracker(),
                 mock(PlacementDriver.class),
-                new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
                 mock(TransactionInflights.class),
                 3_000,
                 0,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index c7c91c4430..9ae2390a08 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -137,7 +137,6 @@ import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import 
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
@@ -768,7 +767,6 @@ public class ItTxTestCluster {
                         startClient ? clientClock : clocks.get(localNodeName),
                         timestampTracker,
                         placementDriver,
-                        new TableRaftServiceImpl(tableName, 1, clients, 
nodeResolver),
                         clientTransactionInflights,
                         500,
                         0,
@@ -1087,4 +1085,8 @@ public class ItTxTestCluster {
     public Map<String, ReplicaManager> replicaManagers() {
         return replicaManagers;
     }
+
+    public Map<String, ClusterService> clusterServices() {
+        return clusterServices;
+    }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 1c6a148b13..0cd93893c6 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaImpl;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicaTestUtils;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
@@ -1686,7 +1687,12 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
                         0,
                         internalTx.id(),
                         internalTx.readTimestamp(),
-                        internalTable.tableRaftService().leaderAssignment(0),
+                        ReplicaTestUtils.leaderAssignment(
+                                
txTestCluster.replicaManagers().get(txTestCluster.localNodeName()),
+                                
txTestCluster.clusterServices().get(txTestCluster.localNodeName()).topologyService(),
+                                internalTable.tableId(),
+                                0
+                        ),
                         internalTx.coordinatorId()
                 )
                 : internalTable.scan(0, internalTx);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 2d733958a0..5f747189e7 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
@@ -99,7 +98,6 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import 
org.apache.ignite.internal.table.distributed.storage.TableRaftServiceImpl;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
@@ -173,6 +171,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
     ) {
         this(
                 replicaSvc,
+                new TestPlacementDriver(LOCAL_NODE),
                 new TestMvPartitionStorage(0),
                 schema,
                 txConfiguration,
@@ -184,6 +183,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
      * Creates a new local table.
      *
      * @param replicaSvc Replica service.
+     * @param placementDriver Placement driver.
      * @param storage Storage.
      * @param schema Schema.
      * @param txConfiguration Transaction configuration.
@@ -191,6 +191,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
      */
     public DummyInternalTableImpl(
             ReplicaService replicaSvc,
+            PlacementDriver placementDriver,
             MvPartitionStorage storage,
             SchemaDescriptor schema,
             TransactionConfiguration txConfiguration,
@@ -203,11 +204,11 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 null,
                 schema,
                 new HybridTimestampTracker(),
-                new TestPlacementDriver(LOCAL_NODE),
+                placementDriver,
                 storageUpdateConfiguration,
                 txConfiguration,
                 new RemotelyTriggeredResourceRegistry(),
-                new TransactionInflights(new TestPlacementDriver(LOCAL_NODE), 
CLOCK_SERVICE)
+                new TransactionInflights(placementDriver, CLOCK_SERVICE)
         );
     }
 
@@ -249,12 +250,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 CLOCK,
                 tracker,
                 placementDriver,
-                new TableRaftServiceImpl(
-                        "test",
-                        1,
-                        Int2ObjectMaps.singleton(PART_ID, 
mock(RaftGroupService.class)),
-                        new SingleClusterNodeResolver(LOCAL_NODE)
-                ),
                 transactionInflights,
                 3_000,
                 0,
@@ -262,7 +257,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 mock(StreamerReceiverRunner.class)
         );
 
-        RaftGroupService svc = 
tableRaftService().partitionRaftGroupService(PART_ID);
+        RaftGroupService svc = mock(RaftGroupService.class);
 
         groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : 
crossTableGroupId;
 
@@ -388,7 +383,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         replicaListener = new PartitionReplicaListener(
                 mvPartStorage,
-                tableRaftService().partitionRaftGroupService(PART_ID),
+                svc,
                 this.txManager,
                 this.txManager.lockManager(),
                 Runnable::run,
@@ -406,7 +401,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 LOCAL_NODE,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogService,
-                new TestPlacementDriver(LOCAL_NODE),
+                placementDriver,
                 mock(ClusterNodeResolver.class),
                 resourcesRegistry,
                 schemaManager,

Reply via email to