This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a2a2e6a2d IGNITE-18172 Added learners' assignments to the rebalance 
algorithm (#1366)
6a2a2e6a2d is described below

commit 6a2a2e6a2dd0772122e32c3858b7ba8b30b70742
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Nov 28 16:04:24 2022 +0300

    IGNITE-18172 Added learners' assignments to the rebalance algorithm (#1366)
---
 .../ignite/internal/affinity/AffinityUtils.java    |  57 +--
 .../ignite/internal/affinity/Assignment.java       |  96 ++++
 .../internal/affinity/AffinityServiceTest.java     |   9 +-
 .../java/org/apache/ignite/internal/raft/Loza.java |  70 ++-
 .../org/apache/ignite/internal/raft/LozaTest.java  |   2 +-
 .../ignite/internal/replicator/ReplicaManager.java |  13 +-
 .../storage/ItRebalanceDistributedTest.java        |  10 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   2 +-
 .../distributed/ItTxDistributedTestSingleNode.java | 136 +++--
 .../internal/table/distributed/TableManager.java   | 561 +++++++++++----------
 .../raft/RebalanceRaftGroupEventsListener.java     | 102 ++--
 .../distributed/replicator/LeaderOrTxState.java    |  15 +-
 .../replicator/PartitionReplicaListener.java       |   9 +-
 .../distributed/replicator/PlacementDriver.java    |  35 +-
 .../ignite/internal/utils/RebalanceUtil.java       | 101 +---
 .../table/distributed/TableManagerTest.java        |  34 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   2 -
 .../replication/PartitionReplicaListenerTest.java  |   7 +-
 .../table/impl/DummyInternalTableImpl.java         |   1 -
 19 files changed, 632 insertions(+), 630 deletions(-)

diff --git 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
index d0c1335bae..ec15cedc02 100644
--- 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
@@ -17,14 +17,15 @@
 
 package org.apache.ignite.internal.affinity;
 
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.function.IntFunction;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Stateless affinity utils that produces helper methods for an affinity 
assignments calculation.
@@ -35,59 +36,31 @@ public class AffinityUtils {
      *
      * @param partitions Partitions count.
      * @param replicas Replicas count.
-     * @param aggregator Function that creates a collection for the partition 
assignments.
-     * @return List nodes by partition.
+     * @return List assignments by partition.
      */
-    public static <T extends Collection<ClusterNode>> List<T> 
calculateAssignments(
-            @NotNull Collection<ClusterNode> baselineNodes,
-            int partitions,
-            int replicas,
-            IntFunction<T> aggregator
-    ) {
-        return RendezvousAffinityFunction.assignPartitions(
+    public static List<Set<Assignment>> 
calculateAssignments(Collection<ClusterNode> baselineNodes, int partitions, int 
replicas) {
+        List<Set<ClusterNode>> affinityNodes = 
RendezvousAffinityFunction.assignPartitions(
                 baselineNodes,
                 partitions,
                 replicas,
                 false,
                 null,
-                aggregator
+                HashSet::new
         );
-    }
 
-    /**
-     * Calculates affinity assignments.
-     *
-     * @param partitions Partitions count.
-     * @param replicas Replicas count.
-     * @return List nodes by partition.
-     */
-    public static List<List<ClusterNode>> calculateAssignments(
-            @NotNull Collection<ClusterNode> baselineNodes,
-            int partitions,
-            int replicas
-    ) {
-        return calculateAssignments(
-                baselineNodes,
-                partitions,
-                replicas,
-                ArrayList::new
-        );
+        return 
affinityNodes.stream().map(AffinityUtils::clusterNodesToAssignments).collect(toList());
     }
 
     /**
-     * Calculates affinity assignments for single partition.
+     * Calculates affinity assignments for a single partition.
      *
      * @param baselineNodes Nodes.
      * @param partition Partition id.
      * @param replicas Replicas count.
-     * @return List of nodes.
+     * @return List of assignments.
      */
-    public static Set<ClusterNode> calculateAssignmentForPartition(
-            Collection<ClusterNode> baselineNodes,
-            int partition,
-            int replicas
-    ) {
-        return RendezvousAffinityFunction.assignPartition(
+    public static Set<Assignment> 
calculateAssignmentForPartition(Collection<ClusterNode> baselineNodes, int 
partition, int replicas) {
+        Set<ClusterNode> affinityNodes = 
RendezvousAffinityFunction.assignPartition(
                 partition,
                 new ArrayList<>(baselineNodes),
                 replicas,
@@ -96,5 +69,11 @@ public class AffinityUtils {
                 null,
                 HashSet::new
         );
+
+        return clusterNodesToAssignments(affinityNodes);
+    }
+
+    private static Set<Assignment> 
clusterNodesToAssignments(Collection<ClusterNode> nodes) {
+        return nodes.stream().map(node -> 
Assignment.forPeer(node.name())).collect(toSet());
     }
 }
diff --git 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignment.java
 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignment.java
new file mode 100644
index 0000000000..e26ad97066
--- /dev/null
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignment.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Represent an assignment of a partition to a node with a specific {@code 
consistentId}.
+ *
+ * <p>There can be two types of assignments: one for the synchronous members 
of a replication group (a.k.a. "peers") and one for
+ * the asynchronous members (a.k.a. "learners") of the same group. Peers get 
synchronously updated during write operations, while learners
+ * are eventually consistent and received updates some time in the future.
+ */
+public class Assignment implements Serializable {
+    private static final long serialVersionUID = -8892379245627437834L;
+
+    private final String consistentId;
+
+    private final boolean isPeer;
+
+    private Assignment(String consistentId, boolean isPeer) {
+        this.consistentId = consistentId;
+        this.isPeer = isPeer;
+    }
+
+    /**
+     * Creates a peer assignment.
+     *
+     * @param consistentId Peer consistent ID.
+     */
+    public static Assignment forPeer(String consistentId) {
+        return new Assignment(consistentId, true);
+    }
+
+    /**
+     * Creates a learner assignment.
+     *
+     * @param consistentId Learner consistent ID.
+     */
+    public static Assignment forLearner(String consistentId) {
+        return new Assignment(consistentId, false);
+    }
+
+    public String consistentId() {
+        return consistentId;
+    }
+
+    public boolean isPeer() {
+        return isPeer;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Assignment that = (Assignment) o;
+
+        if (isPeer != that.isPeer) {
+            return false;
+        }
+        return consistentId.equals(that.consistentId);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = consistentId.hashCode();
+        result = 31 * result + (isPeer ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(Assignment.class, this);
+    }
+}
diff --git 
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
 
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
index d93a3663f9..5310ab1dcc 100644
--- 
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
+++ 
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
@@ -34,7 +35,7 @@ import org.junit.jupiter.api.Test;
 public class AffinityServiceTest {
     @Test
     public void testCalculatedAssignmentHappyPath() {
-        List<List<ClusterNode>> assignments = 
AffinityUtils.calculateAssignments(
+        List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
                 Arrays.asList(
                         new ClusterNode(
                                 UUID.randomUUID().toString(), "node0",
@@ -51,14 +52,14 @@ public class AffinityServiceTest {
 
         assertEquals(10, assignments.size());
 
-        for (List<ClusterNode> partitionAssignment : assignments) {
+        for (Set<Assignment> partitionAssignment : assignments) {
             assertEquals(2, partitionAssignment.size());
         }
     }
 
     @Test
     public void testEmptyBaselineAssignmentsCalculation() {
-        List<List<ClusterNode>> assignments = 
AffinityUtils.calculateAssignments(
+        List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
                 Collections.emptyList(),
                 10,
                 3
@@ -66,7 +67,7 @@ public class AffinityServiceTest {
 
         assertEquals(10, assignments.size());
 
-        for (List<ClusterNode> partitionAssignment : assignments) {
+        for (Set<Assignment> partitionAssignment : assignments) {
             assertEquals(0, partitionAssignment.size());
         }
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 3f69e670b5..51e0793c96 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -49,7 +49,6 @@ import 
org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
@@ -170,30 +169,12 @@ public class Loza implements IgniteComponent {
         raftServer.stop();
     }
 
-    /**
-     * Determines whether a RAFT group should be started locally according to 
a collection of nodes that should have a RAFT group.
-     */
-    public boolean shouldHaveRaftGroupLocally(Collection<ClusterNode> 
raftNodes) {
-        String locNodeName = 
clusterNetSvc.topologyService().localMember().name();
-
-        return raftNodes.stream().anyMatch(n -> locNodeName.equals(n.name()));
-    }
-
-    /**
-     * Determines whether a RAFT group should be started locally according to 
a collection of nodes that should have a RAFT group.
-     */
-    private boolean shouldHaveRaftGroupLocallyImpl(Collection<String> 
raftNodes) {
-        String locNodeName = 
clusterNetSvc.topologyService().localMember().name();
-
-        return raftNodes.stream().anyMatch(locNodeName::equals);
-    }
-
     /**
      * Creates a raft group service providing operations on a raft group. If 
{@code nodes} contains the current node, then raft group starts
      * on the current node.
      *
      * @param groupId Raft group id.
-     * @param nodeConsistentIds Consistent IDs of Raft group nodes.
+     * @param peerConsistentIds Consistent IDs of Raft peers.
      * @param lsnrSupplier Raft group listener supplier.
      * @param groupOptions Options to apply to the group.
      * @return Future representing pending completion of the operation.
@@ -201,20 +182,20 @@ public class Loza implements IgniteComponent {
      */
     public CompletableFuture<RaftGroupService> prepareRaftGroup(
             ReplicationGroupId groupId,
-            Collection<String> nodeConsistentIds,
+            Collection<String> peerConsistentIds,
             Supplier<RaftGroupListener> lsnrSupplier,
             RaftGroupOptions groupOptions
     ) throws NodeStoppingException {
-        return prepareRaftGroup(groupId, nodeConsistentIds, List.of(), 
lsnrSupplier, () -> noopLsnr, groupOptions);
+        return prepareRaftGroup(groupId, peerConsistentIds, List.of(), 
lsnrSupplier, () -> noopLsnr, groupOptions);
     }
 
     /**
-     * Creates a raft group service providing operations on a raft group. If 
{@code nodeConsistentIds} or {@code learnerConsistentIds}
+     * Creates a raft group service providing operations on a raft group. If 
{@code peerConsistentIds} or {@code learnerConsistentIds}
      * contains the current node, then raft group starts on the current node.
      *
      * @param groupId Raft group id.
-     * @param nodeConsistentIds Consistent IDs of Raft group nodes.
-     * @param learnerConsistentIds Consistent IDs of Raft learner nodes.
+     * @param peerConsistentIds Consistent IDs of Raft peers.
+     * @param learnerConsistentIds Consistent IDs of Raft learners.
      * @param lsnrSupplier Raft group listener supplier.
      * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
      * @param groupOptions Options to apply to the group.
@@ -223,7 +204,7 @@ public class Loza implements IgniteComponent {
      */
     public CompletableFuture<RaftGroupService> prepareRaftGroup(
             ReplicationGroupId groupId,
-            Collection<String> nodeConsistentIds,
+            Collection<String> peerConsistentIds,
             Collection<String> learnerConsistentIds,
             Supplier<RaftGroupListener> lsnrSupplier,
             Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
@@ -235,7 +216,7 @@ public class Loza implements IgniteComponent {
 
         try {
             return prepareRaftGroupInternal(
-                    groupId, nodeConsistentIds, learnerConsistentIds, 
lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions
+                    groupId, peerConsistentIds, learnerConsistentIds, 
lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions
             );
         } finally {
             busyLock.leaveBusy();
@@ -246,8 +227,8 @@ public class Loza implements IgniteComponent {
      * Internal method to a raft group creation.
      *
      * @param groupId Raft group id.
-     * @param nodeConsistentIds Consistent IDs of Raft group nodes.
-     * @param learnerConsistentIds Consistent IDs of Raft learner nodes.
+     * @param peerConsistentIds Consistent IDs of Raft peers.
+     * @param learnerConsistentIds Consistent IDs of Raft learners.
      * @param lsnrSupplier Raft group listener supplier.
      * @param eventsLsnrSupplier Raft group events listener supplier.
      * @param groupOptions Options to apply to the group.
@@ -255,16 +236,18 @@ public class Loza implements IgniteComponent {
      */
     private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(
             ReplicationGroupId groupId,
-            Collection<String> nodeConsistentIds,
+            Collection<String> peerConsistentIds,
             Collection<String> learnerConsistentIds,
             Supplier<RaftGroupListener> lsnrSupplier,
             Supplier<RaftGroupEventsListener> eventsLsnrSupplier,
             RaftGroupOptions groupOptions
     ) {
-        List<Peer> peers = idsToPeers(nodeConsistentIds);
+        List<Peer> peers = idsToPeers(peerConsistentIds);
         List<Peer> learners = idsToPeers(learnerConsistentIds);
 
-        if (shouldHaveRaftGroupLocallyImpl(nodeConsistentIds) || 
shouldHaveRaftGroupLocallyImpl(learnerConsistentIds)) {
+        String locNodeName = 
clusterNetSvc.topologyService().localMember().name();
+
+        if (peerConsistentIds.contains(locNodeName) || 
learnerConsistentIds.contains(locNodeName)) {
             startRaftGroupNodeInternal(
                     groupId,
                     peers,
@@ -282,7 +265,8 @@ public class Loza implements IgniteComponent {
      * Start RAFT group on the current node.
      *
      * @param grpId Raft group id.
-     * @param nodeConsistentIds Consistent IDs of Raft group nodes.
+     * @param peerConsistentIds Consistent IDs of Raft peers.
+     * @param learnerConsistentIds Consistent IDs of Raft learners.
      * @param lsnr Raft group listener.
      * @param eventsLsnr Raft group events listener.
      * @param groupOptions Options to apply to the group.
@@ -290,7 +274,8 @@ public class Loza implements IgniteComponent {
      */
     public void startRaftGroupNode(
             ReplicationGroupId grpId,
-            Collection<String> nodeConsistentIds,
+            Collection<String> peerConsistentIds,
+            Collection<String> learnerConsistentIds,
             RaftGroupListener lsnr,
             RaftGroupEventsListener eventsLsnr,
             RaftGroupOptions groupOptions
@@ -300,7 +285,14 @@ public class Loza implements IgniteComponent {
         }
 
         try {
-            startRaftGroupNodeInternal(grpId, idsToPeers(nodeConsistentIds), 
List.of(), lsnr, eventsLsnr, groupOptions);
+            startRaftGroupNodeInternal(
+                    grpId,
+                    idsToPeers(peerConsistentIds),
+                    idsToPeers(learnerConsistentIds),
+                    lsnr,
+                    eventsLsnr,
+                    groupOptions
+            );
         } finally {
             busyLock.leaveBusy();
         }
@@ -310,20 +302,22 @@ public class Loza implements IgniteComponent {
      * Creates and starts a raft group service providing operations on a raft 
group.
      *
      * @param grpId RAFT group id.
-     * @param nodeConsistentIds Consistent IDs of Raft group nodes.
+     * @param peerConsistentIds Consistent IDs of Raft peers.
+     * @param learnerConsistentIds Consistent IDs of Raft learners.
      * @return Future that will be completed with an instance of RAFT group 
service.
      * @throws NodeStoppingException If node stopping intention was detected.
      */
     public CompletableFuture<RaftGroupService> startRaftGroupService(
             ReplicationGroupId grpId,
-            Collection<String> nodeConsistentIds
+            Collection<String> peerConsistentIds,
+            Collection<String> learnerConsistentIds
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return startRaftGroupServiceInternal(grpId, 
idsToPeers(nodeConsistentIds), List.of());
+            return startRaftGroupServiceInternal(grpId, 
idsToPeers(peerConsistentIds), idsToPeers(learnerConsistentIds));
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index bbedb9debf..54039ed9b9 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -79,7 +79,7 @@ public class LozaTest extends IgniteAbstractTest {
 
         assertThrows(
                 NodeStoppingException.class,
-                () -> loza.startRaftGroupService(raftGroupId, newNodes)
+                () -> loza.startRaftGroupService(raftGroupId, newNodes, 
List.of())
         );
         assertThrows(NodeStoppingException.class, () -> 
loza.stopRaftGroup(raftGroupId));
         assertThrows(
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 781ff2b1e5..46849de85c 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.replicator;
 
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
-import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -90,7 +89,7 @@ public class ReplicaManager implements IgniteComponent {
             Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("scheduled-idle-safe-time-sync-thread", LOG));
 
     /** Set of message groups to handler as replica requests. */
-    Set<Class<?>> messageGroupsToHandle;
+    private final Set<Class<?>> messageGroupsToHandle;
 
     /**
      * Constructor for a    replica service.
@@ -272,16 +271,6 @@ public class ReplicaManager implements IgniteComponent {
         assert replicas.isEmpty() : "There are replicas alive [replicas=" + 
replicas.keySet() + ']';
     }
 
-    /**
-     * Determines whether a replication group should be started locally
-     * according to a collection of nodes that should have a replication group.
-     */
-    public boolean shouldHaveReplicationGroupLocally(Collection<ClusterNode> 
replicas) {
-        String locNodeName = 
clusterNetSvc.topologyService().localMember().name();
-
-        return replicas.stream().anyMatch(r -> locNodeName.equals(r.name()));
-    }
-
     /**
      * Extract a hybrid timestamp from timestamp aware request or return null.
      */
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 314e340173..b626ab21a2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -38,6 +38,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import 
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -99,7 +100,6 @@ import org.apache.ignite.internal.util.ReverseIterator;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
@@ -279,7 +279,7 @@ public class ItRebalanceDistributedTest {
                         .changePartitions(1)));
 
         Set<String> partitionNodesConsistentIds = getPartitionClusterNodes(0, 
0).stream()
-                .map(ClusterNode::name)
+                .map(Assignment::consistentId)
                 .collect(Collectors.toSet());
 
         Node newNode = nodes.stream().filter(n -> 
!partitionNodesConsistentIds.contains(n.name)).findFirst().orElseThrow();
@@ -391,15 +391,15 @@ public class ItRebalanceDistributedTest {
         return nodes.stream().filter(n -> 
n.name.equals(consistentId)).findFirst().orElseThrow();
     }
 
-    private Set<ClusterNode> getPartitionClusterNodes(int nodeNum, int 
partNum) {
+    private Set<Assignment> getPartitionClusterNodes(int nodeNum, int partNum) 
{
         var table = ((ExtendedTableConfiguration) 
nodes.get(nodeNum).clusterCfgMgr.configurationRegistry()
                 
.getConfiguration(TablesConfiguration.KEY).tables().get("TBL1"));
 
         if (table != null) {
-            var assignments = table.assignments().value();
+            byte[] assignments = table.assignments().value();
 
             if (assignments != null) {
-                return ((List<Set<ClusterNode>>) 
ByteUtils.fromBytes(assignments)).get(partNum);
+                return ((List<Set<Assignment>>) 
ByteUtils.fromBytes(assignments)).get(partNum);
             }
         }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0a1b361918..a7c9373999 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -449,7 +449,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
             return completedFuture(raftGrpSrvcMock);
         });
 
-        when(rm.startRaftGroupService(any(), any())).thenAnswer(mock -> {
+        when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> 
{
             RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
 
             when(raftGrpSrvcMock.leader()).thenReturn(new Peer("test"));
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index efe23949f4..55fb8352e9 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.distributed;
 
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -40,7 +40,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -94,7 +96,6 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
 import org.apache.ignite.network.StaticNodeFinder;
-import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -128,17 +129,15 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
     private ReplicaService clientReplicaSvc;
 
-    protected Map<ClusterNode, HybridClock> clocks;
+    protected Map<String, HybridClock> clocks;
 
-    protected Map<ClusterNode, Loza> raftServers;
+    protected Map<String, Loza> raftServers;
 
-    protected Map<ClusterNode, ReplicaManager> replicaManagers;
+    protected Map<String, ReplicaManager> replicaManagers;
 
-    protected Map<ClusterNode, ReplicaService> replicaServices;
+    protected Map<String, ReplicaService> replicaServices;
 
-    protected Map<ClusterNode, TxManager> txManagers;
-
-    protected Map<ClusterNode, TopologyService> topologyServices;
+    protected Map<String, TxManager> txManagers;
 
     protected Int2ObjectOpenHashMap<RaftGroupService> accRaftClients;
 
@@ -245,7 +244,6 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
         // Start raft servers. Each raft server can hold multiple groups.
         clocks = new HashMap<>(nodes);
         raftServers = new HashMap<>(nodes);
-        topologyServices = new HashMap<>(nodes);
         replicaManagers = new HashMap<>(nodes);
         replicaServices = new HashMap<>(nodes);
         txManagers = new HashMap<>(nodes);
@@ -258,7 +256,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
             HybridClock clock = new HybridClockImpl();
 
-            clocks.put(node, clock);
+            clocks.put(node.name(), clock);
 
             var raftSrv = new Loza(
                     cluster.get(i),
@@ -270,9 +268,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
             raftSrv.start();
 
-            raftServers.put(node, raftSrv);
-
-            topologyServices.put(node, cluster.get(i).topologyService());
+            raftServers.put(node.name(), raftSrv);
 
             ReplicaManager replicaMgr = new ReplicaManager(
                     cluster.get(i),
@@ -282,7 +278,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
             replicaMgr.start();
 
-            replicaManagers.put(node, replicaMgr);
+            replicaManagers.put(node.name(), replicaMgr);
 
             log.info("Replica manager has been started, node=[" + node + ']');
 
@@ -291,13 +287,13 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                     clock
             );
 
-            replicaServices.put(node, replicaSvc);
+            replicaServices.put(node.name(), replicaSvc);
 
             TxManagerImpl txMgr = new TxManagerImpl(replicaSvc, new 
HeapLockManager(), clock);
 
             txMgr.start();
 
-            txManagers.put(node, txMgr);
+            txManagers.put(node.name(), txMgr);
         }
 
         log.info("Raft servers have been started");
@@ -313,19 +309,19 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
         log.info("Partition groups have been started");
 
+        String localNodeName = 
accRaftClients.get(0).clusterService().topologyService().localMember().name();
+
         TxManager txMgr;
 
         if (startClient()) {
             txMgr = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(), 
clientClock);
         } else {
             // Collocated mode.
-            txMgr = 
txManagers.get(accRaftClients.get(0).clusterService().topologyService().localMember());
+            txMgr = txManagers.get(localNodeName);
         }
 
         assertNotNull(txMgr);
 
-        ClusterNode localNode = 
accRaftClients.get(0).clusterService().topologyService().localMember();
-
         igniteTransactions = new IgniteTransactionsImpl(txMgr);
 
         this.accounts = new TableImpl(new InternalTableImpl(
@@ -337,8 +333,8 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 txMgr,
                 Mockito.mock(MvTableStorage.class),
                 Mockito.mock(TxStateTableStorage.class),
-                startClient() ? clientReplicaSvc : 
replicaServices.get(localNode),
-                startClient() ? clientClock : clocks.get(localNode)
+                startClient() ? clientReplicaSvc : 
replicaServices.get(localNodeName),
+                startClient() ? clientClock : clocks.get(localNodeName)
         ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), txMgr.lockManager());
 
         this.customers = new TableImpl(new InternalTableImpl(
@@ -350,8 +346,8 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 txMgr,
                 Mockito.mock(MvTableStorage.class),
                 Mockito.mock(TxStateTableStorage.class),
-                startClient() ? clientReplicaSvc : 
replicaServices.get(localNode),
-                startClient() ? clientClock : clocks.get(localNode)
+                startClient() ? clientReplicaSvc : 
replicaServices.get(localNodeName),
+                startClient() ? clientClock : clocks.get(localNodeName)
         ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), txMgr.lockManager());
 
         log.info("Tables have been started");
@@ -366,43 +362,36 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
      */
     protected Int2ObjectOpenHashMap<RaftGroupService> startTable(String name, 
UUID tblId)
             throws Exception {
-        List<List<ClusterNode>> assignment = 
RendezvousAffinityFunction.assignPartitions(
-                cluster.stream().map(node -> 
node.topologyService().localMember())
-                        .collect(toList()),
+        List<Set<Assignment>> calculatedAssignments = 
AffinityUtils.calculateAssignments(
+                cluster.stream().map(node -> 
node.topologyService().localMember()).collect(toList()),
                 1,
-                replicas(),
-                false,
-                null
+                replicas()
         );
 
-        Map<ClusterNode, Function<Peer, Boolean>> isLocalPeerCheckerList = 
cluster.stream()
-                .map(ClusterService::topologyService)
-                .collect(toMap(
-                        TopologyService::localMember,
-                        ts -> peer -> 
ts.getByConsistentId(peer.consistentId()).equals(ts.localMember())
-                ));
+        List<Set<String>> assignments = calculatedAssignments.stream()
+                .map(a -> 
a.stream().map(Assignment::consistentId).collect(toSet()))
+                .collect(toList());
+
+        List<TablePartitionId> grpIds = IntStream.range(0, assignments.size())
+                .mapToObj(i -> new TablePartitionId(tblId, i))
+                .collect(toList());
 
         Int2ObjectOpenHashMap<RaftGroupService> clients = new 
Int2ObjectOpenHashMap<>();
 
         List<CompletableFuture<Void>> partitionReadyFutures = new 
ArrayList<>();
 
-        for (int p = 0; p < assignment.size(); p++) {
-            List<ClusterNode> partNodes = assignment.get(p);
-
-            TablePartitionId grpId = new TablePartitionId(tblId, p);
+        for (int p = 0; p < assignments.size(); p++) {
+            Set<String> partAssignments = assignments.get(p);
 
-            List<Peer> conf = partNodes.stream().map(n -> new Peer(n.name()))
-                    .collect(toList());
+            TablePartitionId grpId = grpIds.get(p);
 
-            for (ClusterNode node : partNodes) {
+            for (String assignment : partAssignments) {
                 var testMpPartStorage = new TestMvPartitionStorage(0);
                 var txStateStorage = new TestTxStateStorage();
-                var placementDriver = new 
PlacementDriver(replicaServices.get(node));
+                var placementDriver = new 
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
 
-                for (int part = 0; part < assignment.size(); part++) {
-                    ReplicationGroupId replicaGrpId = new 
TablePartitionId(tblId, part);
-
-                    placementDriver.updateAssignment(replicaGrpId, 
assignment.get(part));
+                for (int part = 0; part < assignments.size(); part++) {
+                    placementDriver.updateAssignment(grpIds.get(part), 
assignments.get(part));
                 }
 
                 int partId = p;
@@ -422,46 +411,43 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                         row2tuple
                 ));
 
-                IndexLocker pkLocker = new HashIndexLocker(indexId, true, 
txManagers.get(node).lockManager(), row2tuple);
+                IndexLocker pkLocker = new HashIndexLocker(indexId, true, 
txManagers.get(assignment).lockManager(), row2tuple);
 
-                CompletableFuture<Void> partitionReadyFuture = 
raftServers.get(node).prepareRaftGroup(
+                CompletableFuture<Void> partitionReadyFuture = 
raftServers.get(assignment).prepareRaftGroup(
                         grpId,
-                        
partNodes.stream().map(ClusterNode::name).collect(toList()),
-                        () -> {
-                            return new PartitionListener(
-                                    new 
TestPartitionDataStorage(testMpPartStorage),
-                                    new TestTxStateStorage(),
-                                    txManagers.get(node),
-                                    () -> Map.of(pkStorage.get().id(), 
pkStorage.get()),
-                                    partId
-                            );
-                        },
+                        partAssignments,
+                        () -> new PartitionListener(
+                                new 
TestPartitionDataStorage(testMpPartStorage),
+                                new TestTxStateStorage(),
+                                txManagers.get(assignment),
+                                () -> Map.of(pkStorage.get().id(), 
pkStorage.get()),
+                                partId
+                        ),
                         RaftGroupOptions.defaults()
                 ).thenAccept(
                         raftSvc -> {
                             try {
                                 
PendingComparableValuesTracker<HybridTimestamp> safeTime =
-                                        new 
PendingComparableValuesTracker<>(clocks.get(node).now());
+                                        new 
PendingComparableValuesTracker<>(clocks.get(assignment).now());
 
-                                replicaManagers.get(node).startReplica(
+                                replicaManagers.get(assignment).startReplica(
                                         new TablePartitionId(tblId, partId),
                                         new PartitionReplicaListener(
                                                 testMpPartStorage,
                                                 raftSvc,
-                                                txManagers.get(node),
-                                                
txManagers.get(node).lockManager(),
+                                                txManagers.get(assignment),
+                                                
txManagers.get(assignment).lockManager(),
                                                 Runnable::run,
                                                 partId,
                                                 tblId,
                                                 () -> Map.of(pkLocker.id(), 
pkLocker),
                                                 pkStorage,
                                                 () -> Map.of(),
-                                                clocks.get(node),
+                                                clocks.get(assignment),
                                                 safeTime,
                                                 txStateStorage,
-                                                topologyServices.get(node),
                                                 placementDriver,
-                                                
isLocalPeerCheckerList.get(node)
+                                                peer -> 
assignment.equals(peer.consistentId())
                                         )
                                 );
                             } catch (NodeStoppingException e) {
@@ -473,6 +459,8 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 partitionReadyFutures.add(partitionReadyFuture);
             }
 
+            List<Peer> conf = 
partAssignments.stream().map(Peer::new).collect(toList());
+
             if (startClient()) {
                 RaftGroupService service = RaftGroupServiceImpl
                         .start(grpId, client, FACTORY, 10_000, conf, true, 
200, executor)
@@ -491,8 +479,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
                 service.shutdown();
 
-                Loza leaderSrv = raftServers
-                        
.get(tmpSvc.topologyService().getByConsistentId(leader.consistentId()));
+                Loza leaderSrv = raftServers.get(leader.consistentId());
 
                 RaftGroupService leaderClusterSvc = RaftGroupServiceImpl
                         .start(grpId, leaderSrv.service(), FACTORY,
@@ -518,7 +505,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
         assertNotNull(leader);
 
-        return 
raftServers.get(svc.clusterService().topologyService().getByConsistentId(leader.consistentId()));
+        return raftServers.get(leader.consistentId());
     }
 
     /**
@@ -540,7 +527,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
         IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
 
-        for (Entry<ClusterNode, Loza> entry : raftServers.entrySet()) {
+        for (Entry<String, Loza> entry : raftServers.entrySet()) {
             Loza rs = entry.getValue();
 
             ReplicaManager replicaMgr = replicaManagers.get(entry.getKey());
@@ -604,8 +591,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
             fail("Unknown table " + t.name());
         }
 
-        TxManager manager = txManagers
-                
.get(clients.get(0).clusterService().topologyService().getByConsistentId(clients.get(0).leader().consistentId()));
+        TxManager manager = 
txManagers.get(clients.get(0).leader().consistentId());
 
         assertNotNull(manager);
 
@@ -617,7 +603,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
     protected boolean assertPartitionsSame(TableImpl table, int partId) {
         int hash = 0;
 
-        for (Map.Entry<ClusterNode, Loza> entry : raftServers.entrySet()) {
+        for (Map.Entry<String, Loza> entry : raftServers.entrySet()) {
             Loza svc = entry.getValue();
             JraftServerImpl server = (JraftServerImpl) svc.server();
             org.apache.ignite.raft.jraft.RaftGroupService grp = 
server.raftGroupService(new TablePartitionId(table.tableId(), partId));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index fecb331b4e..1eee73eab6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -21,7 +21,6 @@ import static java.util.Collections.unmodifiableMap;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
 import static 
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -41,7 +40,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -66,11 +65,13 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.configuration.ConfigurationProperty;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.causality.VersionedValue;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
@@ -110,7 +111,6 @@ import 
org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
-import 
org.apache.ignite.internal.table.distributed.message.HasDataRequestBuilder;
 import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -159,7 +159,6 @@ import 
org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 import org.apache.ignite.table.Table;
-import org.apache.ignite.table.manager.IgniteTables;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -167,8 +166,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Table manager.
  */
-public class TableManager extends Producer<TableEvent, TableEventParameters> 
implements IgniteTables, IgniteTablesInternal,
-        IgniteComponent {
+public class TableManager extends Producer<TableEvent, TableEventParameters> 
implements IgniteTablesInternal, IgniteComponent {
     /**
      * The special value of the last applied index to indicate the beginning 
of a full data rebalancing.
      *
@@ -274,8 +272,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     /** Scan request executor. */
     private final ExecutorService scanRequestExecutor;
 
-    /** Separate executor for IO operations like partition storage 
initialization
-     * or partition raft group meta data persisting.
+    /**
+     * Separate executor for IO operations like partition storage 
initialization or partition raft group meta data persisting.
      */
     private final ExecutorService ioExecutor;
 
@@ -348,10 +346,10 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         this.clock = clock;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
 
-        placementDriver = new PlacementDriver(replicaSvc);
-
         clusterNodeResolver = topologyService::getByConsistentId;
 
+        placementDriver = new PlacementDriver(replicaSvc, clusterNodeResolver);
+
         tablesByIdVv = new VersionedValue<>(null, HashMap::new);
 
         registry.accept(token -> {
@@ -655,9 +653,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         long causalityToken = assignmentsCtx.storageRevision();
 
-        List<Set<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() == 
null ? null : ByteUtils.fromBytes(assignmentsCtx.oldValue());
+        List<Set<Assignment>> oldAssignments = assignmentsCtx.oldValue() == 
null ? null : ByteUtils.fromBytes(assignmentsCtx.oldValue());
 
-        List<Set<ClusterNode>> newAssignments = 
ByteUtils.fromBytes(assignmentsCtx.newValue());
+        List<Set<Assignment>> newAssignments = 
ByteUtils.fromBytes(assignmentsCtx.newValue());
 
         // Empty assignments might be a valid case if tables are created from 
within cluster init HOCON
         // configuration, which is not supported now.
@@ -670,12 +668,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             futures[i] = new CompletableFuture<>();
         }
 
-        // TODO: IGNITE-16288 directAssignments should use async configuration 
API
-        CompletableFuture<List<Set<ClusterNode>>> assignmentsLatestFut = 
CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, () ->
-                directAssignments(tblCfg)));
-
-        TopologyService topologyService = raftMgr.topologyService();
-        ClusterNode localMember = topologyService.localMember();
+        String localMemberName = 
raftMgr.topologyService().localMember().name();
 
         // Create new raft nodes according to new assignments.
         tablesByIdVv.update(causalityToken, (tablesById, e) -> {
@@ -689,122 +682,130 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             for (int i = 0; i < partitions; i++) {
                 int partId = i;
 
-                Set<ClusterNode> oldPartAssignment = oldAssignments == null ? 
Collections.emptySet() :
-                        oldAssignments.get(partId);
-
-                Set<ClusterNode> newPartAssignment = 
newAssignments.get(partId);
-
-                List<String> newPartAssignmentIds = 
newPartAssignment.stream().map(ClusterNode::name).collect(toList());
+                Set<Assignment> oldPartAssignment = oldAssignments == null ? 
Set.of() : oldAssignments.get(partId);
 
+                Set<Assignment> newPartAssignment = newAssignments.get(partId);
 
                 TableImpl table = tablesById.get(tblId);
                 InternalTable internalTbl = table.internalTable();
 
-                MvTableStorage storage = internalTbl.storage();
-                boolean isInMemory = storage.isVolatile();
+                Set<String> newPeers = new HashSet<>();
+                Set<String> newLearners = new HashSet<>();
+                // Temporary variable for "localMemberAssignment" to be 
effectively final.
+                Assignment t = null;
 
-                // start new nodes, only if it is table creation
-                // other cases will be covered by rebalance logic
-                Set<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? 
newPartAssignment : Collections.emptySet();
+                for (Assignment assignment : newPartAssignment) {
+                    String consistentId = assignment.consistentId();
 
-                TablePartitionId replicaGrpId = new TablePartitionId(tblId, 
partId);
+                    if (localMemberName.equals(consistentId)) {
+                        t = assignment;
+                    }
 
-                placementDriver.updateAssignment(replicaGrpId, nodes);
+                    if (assignment.isPeer()) {
+                        newPeers.add(consistentId);
+                    } else {
+                        newLearners.add(consistentId);
+                    }
+                }
 
-                CompletableFuture<Void> startGroupFut = completedFuture(null);
+                Assignment localMemberAssignment = t;
 
-                PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(clock.now());
+                TablePartitionId replicaGrpId = new TablePartitionId(tblId, 
partId);
 
-                if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
-                    startGroupFut = 
getOrCreateMvPartition(internalTbl.storage(), partId)
-                            .thenComposeAsync(mvPartitionStorage -> 
assignmentsLatestFut.thenCompose(assignmentsLatest -> {
-                                boolean hasData = 
mvPartitionStorage.lastAppliedIndex() > 0;
+                placementDriver.updateAssignment(replicaGrpId, newPeers);
 
-                                CompletableFuture<Boolean> fut;
+                PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(clock.now());
 
-                                if (isInMemory || !hasData) {
-                                    Set<ClusterNode> partAssignments = 
assignmentsLatest.get(partId);
+                CompletableFuture<Void> startGroupFut;
 
-                                    fut = queryDataNodesCount(tblId, partId, 
partAssignments).thenApply(dataNodesCount -> {
-                                        boolean fullPartitionRestart = 
dataNodesCount == 0;
+                // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
+                if (oldPartAssignment.isEmpty() && localMemberAssignment != 
null) {
+                    startGroupFut = 
getOrCreateMvPartition(internalTbl.storage(), 
partId).thenComposeAsync(mvPartitionStorage -> {
+                        boolean hasData = 
mvPartitionStorage.lastAppliedIndex() > 0;
 
-                                        if (fullPartitionRestart) {
-                                            return true;
-                                        }
+                        CompletableFuture<Boolean> fut;
 
-                                        boolean majorityAvailable = 
dataNodesCount >= (partAssignments.size() / 2) + 1;
+                        // If Raft is running in in-memory mode or the PDS has 
been cleared, we need to remove the current node
+                        // from the Raft group in order to avoid the double 
vote problem.
+                        // See 
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
+                        if (internalTbl.storage().isVolatile() || !hasData) {
+                            fut = queryDataNodesCount(tblId, partId, 
newPeers).thenApply(dataNodesCount -> {
+                                boolean fullPartitionRestart = dataNodesCount 
== 0;
 
-                                        if (majorityAvailable) {
-                                            
RebalanceUtil.startPeerRemoval(replicaGrpId, localMember, metaStorageMgr);
+                                if (fullPartitionRestart) {
+                                    return true;
+                                }
 
-                                            return false;
-                                        } else {
-                                            // No majority and not a full 
partition restart - need to restart nodes
-                                            // with current partition.
-                                            String msg = "Unable to start 
partition " + partId + ". Majority not available.";
+                                boolean majorityAvailable = dataNodesCount >= 
(newPeers.size() / 2) + 1;
 
-                                            throw new 
IgniteInternalException(msg);
-                                        }
-                                    });
+                                if (majorityAvailable) {
+                                    
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment, 
metaStorageMgr);
+
+                                    return false;
                                 } else {
-                                    fut = completedFuture(true);
+                                    // No majority and not a full partition 
restart - need to restart nodes
+                                    // with current partition.
+                                    String msg = "Unable to start partition " 
+ partId + ". Majority not available.";
+
+                                    throw new IgniteInternalException(msg);
                                 }
+                            });
+                        } else {
+                            fut = completedFuture(true);
+                        }
 
-                                return fut.thenCompose(startGroup -> {
-                                    if (!startGroup) {
-                                        return completedFuture(null);
-                                    }
+                        return fut.thenCompose(startGroup -> {
+                            if (!startGroup) {
+                                return completedFuture(null);
+                            }
 
-                                    return CompletableFuture.supplyAsync(
-                                                    () -> 
getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
-                                                    ioExecutor
-                                            )
-                                            
.thenComposeAsync(txStatePartitionStorage -> {
-                                                RaftGroupOptions groupOptions 
= groupOptionsForPartition(
-                                                        internalTbl.storage(),
-                                                        
internalTbl.txStateStorage(),
-                                                        
partitionKey(internalTbl, partId),
-                                                        newPartAssignment,
-                                                        safeTime
-                                                );
-
-                                                try {
-                                                    raftMgr.startRaftGroupNode(
+                            return 
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId)
+                                    .thenAcceptAsync(txStatePartitionStorage 
-> {
+                                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(
+                                                internalTbl.storage(),
+                                                internalTbl.txStateStorage(),
+                                                partitionKey(internalTbl, 
partId),
+                                                safeTime
+                                        );
+
+                                        try {
+                                            raftMgr.startRaftGroupNode(
+                                                    replicaGrpId,
+                                                    newPeers,
+                                                    newLearners,
+                                                    new PartitionListener(
+                                                            
partitionDataStorage(mvPartitionStorage, internalTbl, partId),
+                                                            
txStatePartitionStorage,
+                                                            txManager,
+                                                            
table.indexStorageAdapters(partId),
+                                                            partId
+                                                    ),
+                                                    new 
RebalanceRaftGroupEventsListener(
+                                                            metaStorageMgr,
+                                                            
tablesCfg.tables().get(table.name()),
                                                             replicaGrpId,
-                                                            
newPartAssignmentIds,
-                                                            new 
PartitionListener(
-                                                                    
partitionDataStorage(mvPartitionStorage, internalTbl, partId),
-                                                                    
txStatePartitionStorage,
-                                                                    txManager,
-                                                                    
table.indexStorageAdapters(partId),
-                                                                    partId
-                                                            ),
-                                                            new 
RebalanceRaftGroupEventsListener(
-                                                                    
metaStorageMgr,
-                                                                    
tablesCfg.tables().get(tablesById.get(tblId).name()),
-                                                                    
replicaGrpId,
-                                                                    partId,
-                                                                    busyLock,
-                                                                    
createPartitionMover(internalTbl, partId),
-                                                                    
this::calculateAssignments,
-                                                                    
rebalanceScheduler
-                                                            ),
-                                                            groupOptions
-                                                    );
-
-                                                    return 
completedFuture(null);
-                                                } catch (NodeStoppingException 
ex) {
-                                                    return failedFuture(ex);
-                                                }
-                                            }, ioExecutor);
-                                });
-                            }), ioExecutor);
+                                                            partId,
+                                                            busyLock,
+                                                            
createPartitionMover(internalTbl, partId),
+                                                            
this::calculateAssignments,
+                                                            rebalanceScheduler
+                                                    ),
+                                                    groupOptions
+                                            );
+                                        } catch (NodeStoppingException ex) {
+                                            throw new CompletionException(ex);
+                                        }
+                                    }, ioExecutor);
+                        });
+                    }, ioExecutor);
+                } else {
+                    startGroupFut = completedFuture(null);
                 }
 
                 startGroupFut
-                        .thenComposeAsync((v) -> {
+                        .thenComposeAsync(v -> {
                             try {
-                                return 
raftMgr.startRaftGroupService(replicaGrpId, newPartAssignmentIds);
+                                return 
raftMgr.startRaftGroupService(replicaGrpId, newPeers, newLearners);
                             } catch (NodeStoppingException ex) {
                                 return failedFuture(ex);
                             }
@@ -812,53 +813,47 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         .thenCompose(updatedRaftGroupService -> {
                             ((InternalTableImpl) 
internalTbl).updateInternalTableRaftGroupService(partId, 
updatedRaftGroupService);
 
-                            if 
(replicaMgr.shouldHaveReplicationGroupLocally(nodes)) {
-                                return 
getOrCreateMvPartition(internalTbl.storage(), partId)
-                                        .thenCombine(
-                                                CompletableFuture.supplyAsync(
-                                                        () -> 
getOrCreateTxStatePartitionStorage(internalTbl.txStateStorage(), partId),
-                                                        ioExecutor
-                                                ),
-                                                (mvPartitionStorage, 
txStatePartitionStorage) -> {
-                                                    try {
-                                                        
replicaMgr.startReplica(replicaGrpId,
-                                                                new 
PartitionReplicaListener(
-                                                                        
mvPartitionStorage,
-                                                                        
updatedRaftGroupService,
-                                                                        
txManager,
-                                                                        
lockMgr,
-                                                                        
scanRequestExecutor,
-                                                                        partId,
-                                                                        tblId,
-                                                                        
table.indexesLockers(partId),
-                                                                        new 
Lazy<>(() -> table.indexStorageAdapters(partId)
-                                                                               
 .get().get(table.pkId())),
-                                                                        () -> 
table.indexStorageAdapters(partId).get(),
-                                                                        clock,
-                                                                        
safeTime,
-                                                                        
txStatePartitionStorage,
-                                                                        
topologyService,
-                                                                        
placementDriver,
-                                                                        
this::isLocalPeer
-                                                                )
-                                                        );
-                                                    } catch 
(NodeStoppingException ex) {
-                                                        throw new 
AssertionError("Loza was stopped before Table manager", ex);
-                                                    }
-
-                                                    return null;
-                                                });
-                            } else {
+                            if (localMemberAssignment == null) {
                                 return completedFuture(null);
                             }
-                        })
-                        .exceptionally(th -> {
-                            LOG.warn("Unable to update raft groups on the 
node", th);
 
-                            return null;
+                            CompletableFuture<MvPartitionStorage> 
partitionStorageFuture =
+                                    
getOrCreateMvPartition(internalTbl.storage(), partId);
+
+                            CompletableFuture<TxStateStorage> 
txStateStorageFuture =
+                                    
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId);
+
+                            return 
partitionStorageFuture.thenAcceptBoth(txStateStorageFuture, (partitionStorage, 
txStateStorage) -> {
+                                try {
+                                    replicaMgr.startReplica(replicaGrpId,
+                                            new PartitionReplicaListener(
+                                                    partitionStorage,
+                                                    updatedRaftGroupService,
+                                                    txManager,
+                                                    lockMgr,
+                                                    scanRequestExecutor,
+                                                    partId,
+                                                    tblId,
+                                                    
table.indexesLockers(partId),
+                                                    new Lazy<>(() -> 
table.indexStorageAdapters(partId).get().get(table.pkId())),
+                                                    () -> 
table.indexStorageAdapters(partId).get(),
+                                                    clock,
+                                                    safeTime,
+                                                    txStateStorage,
+                                                    placementDriver,
+                                                    this::isLocalPeer
+                                            )
+                                    );
+                                } catch (NodeStoppingException ex) {
+                                    throw new AssertionError("Loza was stopped 
before Table manager", ex);
+                                }
+                            });
                         })
                         .whenComplete((res, ex) -> {
-                            // Only successful completion is possible here due 
to .exceptionally() just above.
+                            if (ex != null) {
+                                LOG.warn("Unable to update raft groups on the 
node", ex);
+                            }
+
                             futures[partId].complete(null);
                         });
             }
@@ -888,22 +883,25 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      *
      * @param tblId Table id.
      * @param partId Partition id.
-     * @param partAssignments Partition assignments.
+     * @param peerNames Consistent IDs of Raft peers.
      * @return A future that will hold the quantity of data nodes.
      */
-    private CompletableFuture<Long> queryDataNodesCount(UUID tblId, int 
partId, Set<ClusterNode> partAssignments) {
-        HasDataRequestBuilder requestBuilder = 
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId);
+    private CompletableFuture<Long> queryDataNodesCount(UUID tblId, int 
partId, Collection<String> peerNames) {
+        HasDataRequest request = 
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
 
         //noinspection unchecked
-        CompletableFuture<Boolean>[] requestFutures = 
partAssignments.stream().map(node -> {
-            HasDataRequest request = requestBuilder.build();
-
-            return raftMgr.messagingService().invoke(node, request, 
QUERY_DATA_NODES_COUNT_TIMEOUT).thenApply(response -> {
-                assert response instanceof HasDataResponse : response;
-
-                return ((HasDataResponse) response).result();
-            }).exceptionally(unused -> false);
-        }).toArray(CompletableFuture[]::new);
+        CompletableFuture<Boolean>[] requestFutures = peerNames.stream()
+                .map(clusterNodeResolver)
+                .filter(Objects::nonNull)
+                .map(node -> raftMgr.messagingService()
+                        .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
+                        .thenApply(response -> {
+                            assert response instanceof HasDataResponse : 
response;
+
+                            return ((HasDataResponse) response).result();
+                        })
+                        .exceptionally(unused -> false))
+                .toArray(CompletableFuture[]::new);
 
         return allOf(requestFutures)
                 .thenApply(unused -> 
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
@@ -913,7 +911,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             MvTableStorage mvTableStorage,
             TxStateTableStorage txStateTableStorage,
             PartitionKey partitionKey,
-            Set<ClusterNode> peers,
             PendingComparableValuesTracker<HybridTimestamp> safeTime
     ) {
         RaftGroupOptions raftGroupOptions;
@@ -1065,7 +1062,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * Creates local structures for a table.
      *
      * @param causalityToken Causality token.
-     * @param name  Table name.
+     * @param name Table name.
      * @param tblId Table id.
      * @param partitions Count of partitions.
      * @return Future that will be completed when local changes related to the 
table creation are applied.
@@ -1082,7 +1079,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 partitions, clusterNodeResolver, txManager, tableStorage, 
txStateStorage, replicaSvc, clock);
 
         // TODO: IGNITE-16288 directIndexIds should use async configuration API
-        var table = new TableImpl(internalTable, lockMgr,  () -> 
CompletableFuture.supplyAsync(() -> directIndexIds()));
+        var table = new TableImpl(internalTable, lockMgr, () -> 
CompletableFuture.supplyAsync(() -> directIndexIds()));
 
         tablesByIdVv.update(causalityToken, (previous, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
@@ -1177,9 +1174,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * Drops local structures for a table.
      *
      * @param causalityToken Causality token.
-     * @param name           Table name.
-     * @param tblId          Table id.
-     * @param assignment     Affinity assignment.
+     * @param name Table name.
+     * @param tblId Table id.
+     * @param assignment Affinity assignment.
      */
     private void dropTableLocally(long causalityToken, String name, UUID 
tblId, List<Set<ClusterNode>> assignment) {
         try {
@@ -1208,7 +1205,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             TableImpl table = tablesByIdVv.latest().get(tblId);
 
             assert table != null : IgniteStringFormatter.format("There is no 
table with the name specified [name={}, id={}]",
-                name, tblId);
+                    name, tblId);
 
             CompletableFuture<Void> destroyMvStorageFuture = 
table.internalTable().storage().destroy();
 
@@ -1225,15 +1222,15 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         }
     }
 
-    private Set<ClusterNode> calculateAssignments(TableConfiguration tableCfg, 
int partNum) {
+    private Set<Assignment> calculateAssignments(TableConfiguration tableCfg, 
int partNum) {
         return 
AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum, 
tableCfg.value().replicas());
     }
 
     /**
-     * Creates a new table with the given {@code name} asynchronously. If a 
table with the same name already exists,
-     * a future will be completed with {@link TableAlreadyExistsException}.
+     * Creates a new table with the given {@code name} asynchronously. If a 
table with the same name already exists, a future will be
+     * completed with {@link TableAlreadyExistsException}.
      *
-     * @param name            Table name.
+     * @param name Table name.
      * @param tableInitChange Table changer.
      * @return Future representing pending completion of the operation.
      * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
@@ -1288,8 +1285,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
                                 baselineMgr.nodes(),
                                 tableChange.partitions(),
-                                tableChange.replicas(),
-                                HashSet::new)));
+                                tableChange.replicas())));
                     });
                 })).exceptionally(t -> {
                     Throwable ex = getRootCause(t);
@@ -1407,8 +1403,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     /**
-     * Drops a table with the name specified. If appropriate table does not be 
found, a future will be
-     * completed with {@link TableNotFoundException}.
+     * Drops a table with the name specified. If appropriate table does not be 
found, a future will be completed with
+     * {@link TableNotFoundException}.
      *
      * @param name Table name.
      * @return Future representing pending completion of the operation.
@@ -1448,8 +1444,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                             }
 
                             tblChg.delete(name);
-                        }
-                        ).changeIndexes(idxChg -> {
+                        }).changeIndexes(idxChg -> {
                             List<String> indicesNames = 
tablesCfg.indexes().value().namedListKeys();
 
                             indicesNames.stream().filter(idx ->
@@ -1528,16 +1523,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 }));
     }
 
-    /**
-     * Returns assignments nodes.
-     *
-     * @param tblCfg Table configuration.
-     * @return Set assignments nodes.
-     */
-    private List<Set<ClusterNode>> 
directAssignments(ExtendedTableConfiguration tblCfg) {
-        return ByteUtils.fromBytes(directProxy(tblCfg.assignments()).value());
-    }
-
     /**
      * Collects a list of direct table ids.
      *
@@ -1654,7 +1639,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      *
      * @param name Table name.
      * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
-     * */
+     */
     public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
@@ -1803,95 +1788,95 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                     TablePartitionId replicaGrpId = new 
TablePartitionId(tblId, partId);
 
-                    // Assignments of the pending rebalance that we received 
through the meta storage watch mechanism.
-                    Set<ClusterNode> newPeers = 
ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
-
-                    var pendingAssignments = 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)).join();
+                    Entry pendingAssignmentsEntry = 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)).join();
 
-                    assert pendingAssignmentsWatchEvent.revision() <= 
pendingAssignments.revision()
+                    assert pendingAssignmentsWatchEvent.revision() <= 
pendingAssignmentsEntry.revision()
                             : "Meta Storage watch cannot notify about an event 
with the revision that is more than the actual revision.";
 
+                    // Assignments of the pending rebalance that we received 
through the meta storage watch mechanism.
+                    Set<Assignment> pendingAssignments = 
ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
+
                     TableImpl tbl = tablesByIdVv.latest().get(tblId);
 
                     ExtendedTableConfiguration tblCfg = 
(ExtendedTableConfiguration) tablesCfg.tables().get(tbl.name());
 
                     // Stable assignments from the meta store, which revision 
is bounded by the current pending event.
-                    byte[] stableAssignments = 
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
+                    byte[] stableAssignmentsBytes = 
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
                             
pendingAssignmentsWatchEvent.revision()).join().value();
 
-                    Set<ClusterNode> assignments = stableAssignments == null
+                    Set<Assignment> stableAssignments = stableAssignmentsBytes 
== null
                             // This is for the case when the first rebalance 
occurs.
-                            ? ((List<Set<ClusterNode>>) 
ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
-                            : ByteUtils.fromBytes(stableAssignments);
+                            ? ((List<Set<Assignment>>) 
ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
+                            : ByteUtils.fromBytes(stableAssignmentsBytes);
+
+                    List<String> stablePeers = new ArrayList<>();
+                    List<String> stableLearners = new ArrayList<>();
 
-                    placementDriver.updateAssignment(replicaGrpId, 
assignments);
+                    for (Assignment assignment : stableAssignments) {
+                        if (assignment.isPeer()) {
+                            stablePeers.add(assignment.consistentId());
+                        } else {
+                            stableLearners.add(assignment.consistentId());
+                        }
+                    }
+
+                    placementDriver.updateAssignment(replicaGrpId, 
stablePeers);
 
                     ClusterNode localMember = 
raftMgr.topologyService().localMember();
 
-                    List<ClusterNode> deltaPeers = newPeers.stream()
-                            .filter(p -> !assignments.contains(p))
-                            .collect(toList());
+                    // Start a new Raft node and Replica if this node has 
appeared in the new assignments.
+                    boolean shouldStartLocalServices = 
pendingAssignments.stream()
+                            .filter(assignment -> 
localMember.name().equals(assignment.consistentId()))
+                            .anyMatch(assignment -> 
!stableAssignments.contains(assignment));
 
                     PendingComparableValuesTracker<HybridTimestamp> safeTime = 
new PendingComparableValuesTracker<>(clock.now());
 
                     InternalTable internalTable = tbl.internalTable();
 
-                    try {
-                        LOG.info("Received update on pending assignments. 
Check if new raft group should be started"
-                                        + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
-                                pendingAssignmentsWatchEvent.key(), partId, 
tbl.name(), localMember.address());
+                    LOG.info("Received update on pending assignments. Check if 
new raft group should be started"
+                                    + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
+                            pendingAssignmentsWatchEvent.key(), partId, 
tbl.name(), localMember.address());
 
-                        if (raftMgr.shouldHaveRaftGroupLocally(deltaPeers)) {
-                            MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(internalTable.storage(), partId).join();
+                    if (shouldStartLocalServices) {
+                        MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(internalTable.storage(), partId).join();
 
-                            TxStateStorage txStatePartitionStorage = 
getOrCreateTxStatePartitionStorage(
-                                    internalTable.txStateStorage(),
-                                    partId
-                            );
+                        TxStateStorage txStatePartitionStorage = 
getOrCreateTxStateStorage(internalTable.txStateStorage(), partId);
 
-                            RaftGroupOptions groupOptions = 
groupOptionsForPartition(
-                                    internalTable.storage(),
-                                    internalTable.txStateStorage(),
-                                    partitionKey(internalTable, partId),
-                                    assignments,
-                                    safeTime
-                            );
+                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(
+                                internalTable.storage(),
+                                internalTable.txStateStorage(),
+                                partitionKey(internalTable, partId),
+                                safeTime
+                        );
 
-                            RaftGroupListener raftGrpLsnr = new 
PartitionListener(
-                                    partitionDataStorage(mvPartitionStorage, 
internalTable, partId),
-                                    txStatePartitionStorage,
-                                    txManager,
-                                    tbl.indexStorageAdapters(partId),
-                                    partId
-                            );
+                        RaftGroupListener raftGrpLsnr = new PartitionListener(
+                                partitionDataStorage(mvPartitionStorage, 
internalTable, partId),
+                                txStatePartitionStorage,
+                                txManager,
+                                tbl.indexStorageAdapters(partId),
+                                partId
+                        );
 
-                            RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
-                                    metaStorageMgr,
-                                    tblCfg,
-                                    replicaGrpId,
-                                    partId,
-                                    busyLock,
-                                    createPartitionMover(internalTable, 
partId),
-                                    TableManager.this::calculateAssignments,
-                                    rebalanceScheduler
-                            );
+                        RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
+                                metaStorageMgr,
+                                tblCfg,
+                                replicaGrpId,
+                                partId,
+                                busyLock,
+                                createPartitionMover(internalTable, partId),
+                                TableManager.this::calculateAssignments,
+                                rebalanceScheduler
+                        );
 
+                        try {
                             raftMgr.startRaftGroupNode(
                                     replicaGrpId,
-                                    
assignments.stream().map(ClusterNode::name).collect(toList()),
+                                    stablePeers,
+                                    stableLearners,
                                     raftGrpLsnr,
                                     raftGrpEvtsLsnr,
                                     groupOptions
                             );
-                        }
-
-                        if 
(replicaMgr.shouldHaveReplicationGroupLocally(deltaPeers)) {
-                            MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(internalTable.storage(), partId).join();
-
-                            TxStateStorage txStatePartitionStorage = 
getOrCreateTxStatePartitionStorage(
-                                    internalTable.txStateStorage(),
-                                    partId
-                            );
 
                             replicaMgr.startReplica(replicaGrpId,
                                     new PartitionReplicaListener(
@@ -1908,35 +1893,43 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                             clock,
                                             safeTime,
                                             txStatePartitionStorage,
-                                            raftMgr.topologyService(),
                                             placementDriver,
-                                            peer -> isLocalPeer(peer)
+                                            TableManager.this::isLocalPeer
                                     )
                             );
+                        } catch (NodeStoppingException e) {
+                            // no-op
                         }
-                    } catch (NodeStoppingException e) {
-                        // no-op
                     }
 
                     // Do not change peers of the raft group if this is a 
stale event.
                     // Note that we start raft node before for the sake of the 
consistency in a starting and stopping raft nodes.
-                    if (pendingAssignmentsWatchEvent.revision() < 
pendingAssignments.revision()) {
+                    if (pendingAssignmentsWatchEvent.revision() < 
pendingAssignmentsEntry.revision()) {
                         return true;
                     }
 
-                    List<Peer> newNodes = newPeers.stream().map(n -> new 
Peer(n.name())).collect(toList());
-
                     RaftGroupService partGrpSvc = 
internalTable.partitionRaftGroupService(partId);
 
                     LeaderWithTerm leaderWithTerm = 
partGrpSvc.refreshAndGetLeaderWithTerm().join();
 
                     // run update of raft configuration if this node is a 
leader
-                    if 
(localMember.name().equals(leaderWithTerm.leader().consistentId())) {
+                    if (isLocalPeer(leaderWithTerm.leader())) {
+                        List<Peer> newPeers = new ArrayList<>();
+                        List<Peer> newLearners = new ArrayList<>();
+
+                        for (Assignment assignment : pendingAssignments) {
+                            if (assignment.isPeer()) {
+                                newPeers.add(new 
Peer(assignment.consistentId()));
+                            } else {
+                                newLearners.add(new 
Peer(assignment.consistentId()));
+                            }
+                        }
+
                         LOG.info("Current node={} is the leader of partition 
raft group={}. "
                                         + "Initiate rebalance process for 
partition={}, table={}",
                                 localMember.address(), replicaGrpId, partId, 
tbl.name());
-                        // TODO: Provide learners during rebalance, see 
https://issues.apache.org/jira/browse/IGNITE-18172
-                        partGrpSvc.changePeersAsync(newNodes, List.of(), 
leaderWithTerm.term()).join();
+
+                        partGrpSvc.changePeersAsync(newPeers, newLearners, 
leaderWithTerm.term()).join();
                     }
 
                     return true;
@@ -1972,25 +1965,28 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                     TablePartitionId replicaGrpId = new 
TablePartitionId(tblId, part);
 
-                    Set<ClusterNode> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
+                    Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
 
                     byte[] pendingFromMetastorage = 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId),
                             
stableAssignmentsWatchEvent.revision()).join().value();
 
-                    Set<ClusterNode> pendingAssignments = 
pendingFromMetastorage == null
-                            ? Collections.emptySet()
+                    Set<Assignment> pendingAssignments = 
pendingFromMetastorage == null
+                            ? Set.of()
                             : ByteUtils.fromBytes(pendingFromMetastorage);
 
-                    try {
-                        ClusterNode localMember = 
raftMgr.topologyService().localMember();
+                    String localMemberName = 
raftMgr.topologyService().localMember().name();
+
+                    boolean shouldStopLocalServices = 
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
+                            .noneMatch(assignment -> 
assignment.consistentId().equals(localMemberName));
 
-                        if (!stableAssignments.contains(localMember) && 
!pendingAssignments.contains(localMember)) {
+                    if (shouldStopLocalServices) {
+                        try {
                             raftMgr.stopRaftGroup(replicaGrpId);
 
                             replicaMgr.stopReplica(new TablePartitionId(tblId, 
part));
+                        } catch (NodeStoppingException e) {
+                            // no-op
                         }
-                    } catch (NodeStoppingException e) {
-                        // no-op
                     }
 
                     return true;
@@ -2062,21 +2058,22 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * in when the rebalance was interrupted.
      *
      * @param mvTableStorage Multi-versioned table storage.
-     * @param partitioId Partition ID.
+     * @param partitionId Partition ID.
      * @return Future that will complete when the operation completes.
      */
-    private static CompletableFuture<MvPartitionStorage> 
getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitioId) {
-        MvPartitionStorage mvPartitionStorage = 
mvTableStorage.getOrCreateMvPartition(partitioId);
-
-        // If a full rebalance did not happen, then we return the storage as 
is.
-        if (mvPartitionStorage.persistedIndex() != FULL_RABALANCING_STARTED) {
-            return completedFuture(mvPartitionStorage);
-        }
-
-        // A full rebalance was started but not completed, so the partition 
must be recreated to remove the garbage.
-        return mvTableStorage
-                .destroyPartition(partitioId)
-                .thenApply(unused -> 
mvTableStorage.getOrCreateMvPartition(partitioId));
+    private CompletableFuture<MvPartitionStorage> 
getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
+        return CompletableFuture.supplyAsync(() -> 
mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
+                .thenCompose(storage -> {
+                    if (storage.persistedIndex() != FULL_RABALANCING_STARTED) {
+                        // If a full rebalance did not happen, then we return 
the storage as is.
+                        return completedFuture(storage);
+                    } else {
+                        // A full rebalance was started but not completed, so 
the partition must be recreated to remove the garbage.
+                        return mvTableStorage
+                                .destroyPartition(partitionId)
+                                .thenApplyAsync(unused -> 
mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor);
+                    }
+                });
     }
 
     /**
@@ -2088,10 +2085,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * @param txStateTableStorage Transaction state storage for a table.
      * @param partId Partition ID.
      */
-    private static TxStateStorage getOrCreateTxStatePartitionStorage(
-            TxStateTableStorage txStateTableStorage,
-            int partId
-    ) {
+    private static TxStateStorage 
getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int partId) {
         TxStateStorage txStatePartitionStorage = 
txStateTableStorage.getOrCreateTxStateStorage(partId);
 
         // If a full rebalance did not happen, then we return the storage as 
is.
@@ -2103,4 +2097,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         return txStateTableStorage.getOrCreateTxStateStorage(partId);
     }
+
+    /**
+     * Async version of {@link #getOrCreateTxStateStorage}.
+     */
+    private CompletableFuture<TxStateStorage> 
getOrCreateTxStateStorageAsync(TxStateTableStorage txStateTableStorage, int 
partId) {
+        return CompletableFuture.supplyAsync(() -> 
getOrCreateTxStateStorage(txStateTableStorage, partId), ioExecutor);
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index e7eecc8724..79c5be150f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -26,8 +26,6 @@ import static 
org.apache.ignite.internal.metastorage.client.Operations.remove;
 import static org.apache.ignite.internal.utils.RebalanceUtil.intersect;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey;
-import static org.apache.ignite.internal.utils.RebalanceUtil.readClusterNodes;
-import static 
org.apache.ignite.internal.utils.RebalanceUtil.resolveClusterNodes;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
 import static org.apache.ignite.internal.utils.RebalanceUtil.subtract;
 import static org.apache.ignite.internal.utils.RebalanceUtil.switchAppendKey;
@@ -46,6 +44,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -62,7 +63,6 @@ import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -80,7 +80,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
     private static final int REBALANCE_RETRY_THRESHOLD = 10;
 
     /** Delay between unsuccessful trial of a rebalance and a new trial, ms. */
-    public static final int REBALANCE_RETRY_DELAY_MS = 200;
+    private static final int REBALANCE_RETRY_DELAY_MS = 200;
 
     /** Success code for the MetaStorage switch append assignments change. */
     private static final int SWITCH_APPEND_SUCCESS = 1;
@@ -131,7 +131,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
     private final AtomicInteger rebalanceAttempts =  new AtomicInteger(0);
 
     /** Function that calculates assignments for table's partition. */
-    private final BiFunction<TableConfiguration, Integer, Set<ClusterNode>> 
calculateAssignmentsFn;
+    private final BiFunction<TableConfiguration, Integer, Set<Assignment>> 
calculateAssignmentsFn;
 
     /**
      * Constructs new listener.
@@ -152,7 +152,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             int partNum,
             IgniteSpinBusyLock busyLock,
             PartitionMover partitionMover,
-            BiFunction<TableConfiguration, Integer, Set<ClusterNode>> 
calculateAssignmentsFn,
+            BiFunction<TableConfiguration, Integer, Set<Assignment>> 
calculateAssignmentsFn,
             ScheduledExecutorService rebalanceScheduler) {
         this.metaStorageMgr = metaStorageMgr;
         this.tblConfiguration = tblConfiguration;
@@ -180,17 +180,29 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                 try {
                     rebalanceAttempts.set(0);
 
-                    Entry pendingEntry = 
metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get();
+                    byte[] pendingAssignmentsBytes = 
metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get().value();
 
-                    if (pendingEntry.value() != null) {
-                        Set<ClusterNode> pendingNodes = 
ByteUtils.fromBytes(pendingEntry.value());
+                    if (pendingAssignmentsBytes != null) {
+                        Set<Assignment> pendingAssignments = 
ByteUtils.fromBytes(pendingAssignmentsBytes);
 
-                        LOG.info("New leader elected. Going to reconfigure 
peers [group={}, partition={}, table={}, peers={}]",
-                                partId, partNum, 
tblConfiguration.name().value(), pendingNodes);
+                        List<Peer> peers = new ArrayList<>();
+                        List<Peer> learners = new ArrayList<>();
 
-                        
partitionMover.movePartition(clusterNodesToPeers(pendingNodes), List.of(), 
term).join();
+                        for (Assignment assignment : pendingAssignments) {
+                            if (assignment.isPeer()) {
+                                peers.add(new Peer(assignment.consistentId()));
+                            } else {
+                                learners.add(new 
Peer(assignment.consistentId()));
+                            }
+                        }
+
+                        LOG.info("New leader elected. Going to apply new 
configuration "
+                                        + "[group={}, partition={}, table={}, 
peers={}, learners={}]",
+                                partId, partNum, 
tblConfiguration.name().value(), peers, learners);
+
+                        partitionMover.movePartition(peers, learners, 
term).get();
                     }
-                } catch (InterruptedException | ExecutionException e) {
+                } catch (Exception e) {
                     // TODO: IGNITE-14693
                     LOG.warn("Unable to start rebalance [partition={}, 
table={}, term={}]",
                             e, partNum, tblConfiguration.name().value(), term);
@@ -320,31 +332,31 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             Entry switchReduceEntry = values.get(switchReduceKey);
             Entry switchAppendEntry = values.get(switchAppendKey);
 
-            Set<ClusterNode> calculatedAssignments = 
calculateAssignmentsFn.apply(tblConfiguration, partNum);
+            Set<Assignment> retrievedStable = readAssignments(stableEntry);
+            Set<Assignment> retrievedSwitchReduce = 
readAssignments(switchReduceEntry);
+            Set<Assignment> retrievedSwitchAppend = 
readAssignments(switchAppendEntry);
 
-            Set<ClusterNode> stable = resolveClusterNodes(peers, 
pendingEntry.value(), stableEntry.value());
+            Set<Assignment> calculatedAssignments = 
calculateAssignmentsFn.apply(tblConfiguration, partNum);
 
-            Set<ClusterNode> retrievedSwitchReduce = 
readClusterNodes(switchReduceEntry);
-            Set<ClusterNode> retrievedSwitchAppend = 
readClusterNodes(switchAppendEntry);
-            Set<ClusterNode> retrievedStable = readClusterNodes(stableEntry);
+            Set<Assignment> stable = createAssignments(peers, learners);
 
             // Were reduced
-            Set<ClusterNode> reducedNodes = subtract(retrievedSwitchReduce, 
stable);
+            Set<Assignment> reducedNodes = subtract(retrievedSwitchReduce, 
stable);
 
             // Were added
-            Set<ClusterNode> addedNodes = subtract(stable, retrievedStable);
+            Set<Assignment> addedNodes = subtract(stable, retrievedStable);
 
             // For further reduction
-            Set<ClusterNode> calculatedSwitchReduce = 
subtract(retrievedSwitchReduce, reducedNodes);
+            Set<Assignment> calculatedSwitchReduce = 
subtract(retrievedSwitchReduce, reducedNodes);
 
             // For further addition
-            Set<ClusterNode> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
+            Set<Assignment> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
             calculatedSwitchAppend = subtract(calculatedSwitchAppend, 
addedNodes);
             calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
 
-            Set<ClusterNode> calculatedPendingReduction = subtract(stable, 
retrievedSwitchReduce);
+            Set<Assignment> calculatedPendingReduction = subtract(stable, 
retrievedSwitchReduce);
 
-            Set<ClusterNode> calculatedPendingAddition = union(stable, 
reducedNodes);
+            Set<Assignment> calculatedPendingAddition = union(stable, 
reducedNodes);
             calculatedPendingAddition = intersect(calculatedAssignments, 
calculatedPendingAddition);
 
             // eq(revision(assignments.stable), 
retrievedAssignmentsStable.revision)
@@ -368,7 +380,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
             // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
             tblConfiguration.change(ch -> {
-                List<Set<ClusterNode>> assignments = 
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
+                List<Set<Assignment>> assignments = 
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
                 assignments.set(partNum, stable);
                 ((ExtendedTableChange) 
ch).changeAssignments(ByteUtils.toBytes(assignments));
             }).get(10, TimeUnit.SECONDS);
@@ -486,22 +498,6 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
         }
     }
 
-    /**
-     * Transforms list of cluster nodes to the list of peers.
-     *
-     * @param nodes List of cluster nodes to transform.
-     * @return List of transformed peers.
-     */
-    private static List<Peer> clusterNodesToPeers(Set<ClusterNode> nodes) {
-        List<Peer> peers = new ArrayList<>(nodes.size());
-
-        for (ClusterNode node : nodes) {
-            peers.add(new Peer(node.name()));
-        }
-
-        return peers;
-    }
-
     /**
      * Transforms list of peerIds to list of peers.
      *
@@ -517,4 +513,28 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
         return peers;
     }
+
+    /**
+     * Creates a set of assignments from the given set of peers and learners.
+     */
+    private static Set<Assignment> createAssignments(Collection<PeerId> peers, 
Collection<PeerId> learners) {
+        Stream<Assignment> newAssignments = Stream.concat(
+                peers.stream().map(peerId -> 
Assignment.forPeer(peerId.getConsistentId())),
+                learners.stream().map(peerId -> 
Assignment.forLearner(peerId.getConsistentId()))
+        );
+
+        return newAssignments.collect(Collectors.toSet());
+    }
+
+    /**
+     * Reads a list of cluster nodes from a MetaStorage entry.
+     *
+     * @param entry MetaStorage entry.
+     * @return List of cluster nodes.
+     */
+    private static Set<Assignment> readAssignments(Entry entry) {
+        byte[] value = entry.value();
+
+        return value == null ? Set.of() : ByteUtils.fromBytes(value);
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
index d36859d084..a37a7b512f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
@@ -20,18 +20,17 @@ package 
org.apache.ignite.internal.table.distributed.replicator;
 import java.io.Serializable;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
-import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Response for the {@link TxStateReplicaRequest}. Can contain either the 
Partition Group leader, which should be
+ * Response for the {@link TxStateReplicaRequest}. Can contain either the 
consistent ID of the Partition Group leader, which should be
  * queried for the TX Meta, or the TX Meta itself.
  */
 public class LeaderOrTxState implements Serializable {
     private static final long serialVersionUID = -3555591755828355117L;
 
     @Nullable
-    private final ClusterNode leader;
+    private final String leaderName;
 
     @Nullable
     private final TxMeta txMeta;
@@ -39,16 +38,16 @@ public class LeaderOrTxState implements Serializable {
     /**
      * Creates a response.
      *
-     * @param leader Leader node.
+     * @param leaderName Leader consistent ID.
      * @param txMeta TX meta.
      */
-    public LeaderOrTxState(@Nullable ClusterNode leader, @Nullable TxMeta 
txMeta) {
-        this.leader = leader;
+    public LeaderOrTxState(@Nullable String leaderName, @Nullable TxMeta 
txMeta) {
+        this.leaderName = leaderName;
         this.txMeta = txMeta;
     }
 
-    public @Nullable ClusterNode leader() {
-        return leader;
+    public @Nullable String leaderName() {
+        return leaderName;
     }
 
     public @Nullable TxMeta txMeta() {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 5dbb8f0475..cee18b3a9e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -108,7 +108,6 @@ import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
@@ -160,9 +159,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Tx state storage. */
     private final TxStateStorage txStateStorage;
 
-    /** Topology service. */
-    private final TopologyService topologyService;
-
     /** Hybrid clock. */
     private final HybridClock hybridClock;
 
@@ -203,7 +199,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param hybridClock Hybrid clock.
      * @param safeTime Safe time clock.
      * @param txStateStorage Transaction state storage.
-     * @param topologyService Topology services.
      * @param placementDriver Placement driver.
      * @param isLocalPeerChecker Function for checking that the given peer is 
local.
      */
@@ -221,7 +216,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             HybridClock hybridClock,
             PendingComparableValuesTracker<HybridTimestamp> safeTime,
             TxStateStorage txStateStorage,
-            TopologyService topologyService,
             PlacementDriver placementDriver,
             Function<Peer, Boolean> isLocalPeerChecker
     ) {
@@ -238,7 +232,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         this.hybridClock = hybridClock;
         this.safeTime = safeTime;
         this.txStateStorage = txStateStorage;
-        this.topologyService = topologyService;
         this.placementDriver = placementDriver;
         this.isLocalPeerChecker = isLocalPeerChecker;
 
@@ -310,7 +303,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                         return txStateFut.thenApply(txMeta -> new 
LeaderOrTxState(null, txMeta));
                     } else {
-                        return completedFuture(new 
LeaderOrTxState(topologyService.getByConsistentId(leader.consistentId()), 
null));
+                        return completedFuture(new 
LeaderOrTxState(leader.consistentId(), null));
                     }
                 });
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
index 9710a5f0d8..ea43c02085 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
@@ -20,35 +20,42 @@ package 
org.apache.ignite.internal.table.distributed.replicator;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 
 /**
  * Placement driver.
  */
 public class PlacementDriver {
-    /** Assignment nodes per replication group. */
-    private final Map<ReplicationGroupId, LinkedHashSet<ClusterNode>> 
primaryReplicaMapping = new ConcurrentHashMap<>();
+    /** Assignment node names per replication group. */
+    private final Map<ReplicationGroupId, LinkedHashSet<String>> 
primaryReplicaMapping = new ConcurrentHashMap<>();
 
     /** Replication service. */
     private final ReplicaService replicaService;
 
+    /** Function that resolves a node consistent ID to a cluster node. */
+    private final Function<String, ClusterNode> clusterNodeResolver;
+
     /**
      * The constructor.
      *
      * @param replicaService Replication service.
      */
-    public PlacementDriver(ReplicaService replicaService) {
+    public PlacementDriver(ReplicaService replicaService, Function<String, 
ClusterNode> clusterNodeResolver) {
         this.replicaService = replicaService;
+        this.clusterNodeResolver = clusterNodeResolver;
     }
 
     /**
-     * Sends a transaction sate request to the primary replica.
+     * Sends a transaction state request to the primary replica.
      *
      * @param replicaGrp Replication group id.
      * @param request Status request.
@@ -66,10 +73,10 @@ public class PlacementDriver {
      * Updates an assignment for the specific replication group.
      *
      * @param replicaGrpId Replication group id.
-     * @param assignment Assignment.
+     * @param nodeNames Assignment node names.
      */
-    public void updateAssignment(ReplicationGroupId replicaGrpId, 
Collection<ClusterNode> assignment) {
-        primaryReplicaMapping.put(replicaGrpId, new 
LinkedHashSet<>(assignment));
+    public void updateAssignment(ReplicationGroupId replicaGrpId, 
Collection<String> nodeNames) {
+        primaryReplicaMapping.put(replicaGrpId, new 
LinkedHashSet<>(nodeNames));
     }
 
     /**
@@ -81,19 +88,23 @@ public class PlacementDriver {
      * @param request Request.
      */
     private void sendAndRetry(CompletableFuture<TxMeta> resFut, 
ReplicationGroupId replicaGrp, TxStateReplicaRequest request) {
-        ClusterNode nodeToSend = 
primaryReplicaMapping.get(replicaGrp).iterator().next();
+        ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).stream()
+                .map(clusterNodeResolver)
+                .filter(Objects::nonNull)
+                .findFirst()
+                .orElseThrow(() -> new IgniteInternalException("All replica 
nodes are unavailable"));
 
         replicaService.invoke(nodeToSend, request).thenAccept(resp -> {
             assert resp instanceof LeaderOrTxState : "Unsupported response 
type [type=" + resp.getClass().getSimpleName() + ']';
 
-            LeaderOrTxState stateAndLeader = (LeaderOrTxState) resp;
+            LeaderOrTxState stateOrLeader = (LeaderOrTxState) resp;
 
-            ClusterNode nextNodeToSend = stateAndLeader.leader();
+            String nextNodeToSend = stateOrLeader.leaderName();
 
             if (nextNodeToSend == null) {
-                resFut.complete(stateAndLeader.txMeta());
+                resFut.complete(stateOrLeader.txMeta());
             } else {
-                LinkedHashSet<ClusterNode> newAssignment = new 
LinkedHashSet<>();
+                LinkedHashSet<String> newAssignment = new LinkedHashSet<>();
 
                 newAssignment.add(nextNodeToSend);
                 newAssignment.addAll(primaryReplicaMapping.get(replicaGrp));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index c987ed95e9..6c1f98358e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -26,22 +26,18 @@ import static 
org.apache.ignite.internal.metastorage.client.If.iif;
 import static org.apache.ignite.internal.metastorage.client.Operations.ops;
 import static org.apache.ignite.internal.metastorage.client.Operations.put;
 import static org.apache.ignite.internal.metastorage.client.Operations.remove;
-import static org.apache.ignite.internal.util.IgniteUtils.capacity;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.client.Conditions;
 import org.apache.ignite.internal.metastorage.client.Entry;
 import org.apache.ignite.internal.metastorage.client.If;
 import org.apache.ignite.internal.metastorage.client.Operations;
@@ -49,9 +45,7 @@ import 
org.apache.ignite.internal.metastorage.client.WatchEvent;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -104,7 +98,7 @@ public class RebalanceUtil {
 
         ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
 
-        Set<ClusterNode> partAssignments = 
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+        Set<Assignment> partAssignments = 
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
 
         byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
 
@@ -296,13 +290,13 @@ public class RebalanceUtil {
      * storage has been cleared.
      *
      * @param partId Partition's raft group id.
-     * @param clusterNode Cluster node to be removed from peers.
+     * @param peerAssignment Assignment of the peer to be removed.
      * @param metaStorageMgr MetaStorage manager.
      * @return Completable future that signifies the completion of this 
operation.
      */
     public static CompletableFuture<Void> startPeerRemoval(
             TablePartitionId partId,
-            ClusterNode clusterNode,
+            Assignment peerAssignment,
             MetaStorageManager metaStorageMgr
     ) {
         ByteArray key = switchReduceKey(partId);
@@ -312,29 +306,29 @@ public class RebalanceUtil {
                     byte[] prevValue = 
retrievedAssignmentsSwitchReduce.value();
 
                     if (prevValue != null) {
-                        Set<ClusterNode> prev = ByteUtils.fromBytes(prevValue);
+                        Set<Assignment> prev = ByteUtils.fromBytes(prevValue);
 
-                        prev.add(clusterNode);
+                        prev.add(peerAssignment);
 
                         return metaStorageMgr.invoke(
                                 
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
-                                Operations.put(key, ByteUtils.toBytes(prev)),
+                                put(key, ByteUtils.toBytes(prev)),
                                 Operations.noop()
                         );
                     } else {
                         var newValue = new HashSet<>();
 
-                        newValue.add(clusterNode);
+                        newValue.add(peerAssignment);
 
                         return metaStorageMgr.invoke(
-                                Conditions.notExists(key),
-                                Operations.put(key, 
ByteUtils.toBytes(newValue)),
+                                notExists(key),
+                                put(key, ByteUtils.toBytes(newValue)),
                                 Operations.noop()
                         );
                     }
                 }).thenCompose(res -> {
                     if (!res) {
-                        return startPeerRemoval(partId, clusterNode, 
metaStorageMgr);
+                        return startPeerRemoval(partId, peerAssignment, 
metaStorageMgr);
                     }
 
                     return CompletableFuture.completedFuture(null);
@@ -358,23 +352,23 @@ public class RebalanceUtil {
         Entry entry = event.entryEvent().newEntry();
         byte[] eventData = entry.value();
 
-        Set<ClusterNode> assignments = 
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+        Set<Assignment> switchReduce = ByteUtils.fromBytes(eventData);
 
-        Set<ClusterNode> switchReduce = ByteUtils.fromBytes(eventData);
+        if (switchReduce.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        Set<Assignment> assignments = 
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
 
         ByteArray pendingKey = pendingPartAssignmentsKey(partId);
 
-        Set<ClusterNode> pendingAssignments = subtract(assignments, 
switchReduce);
+        Set<Assignment> pendingAssignments = subtract(assignments, 
switchReduce);
 
         byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
         byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
 
-        if (switchReduce.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
-        }
-
         ByteArray changeTriggerKey = partChangeTriggerKey(partId);
-        byte[] rev = 
ByteUtils.longToBytes(event.entryEvent().newEntry().revision());
+        byte[] rev = ByteUtils.longToBytes(entry.revision());
 
         // Here is what happens in the MetaStorage:
         // if ((notExists(changeTriggerKey) || value(changeTriggerKey) < 
revision) && (notExists(pendingKey) && notExists(stableKey)) {
@@ -412,57 +406,6 @@ public class RebalanceUtil {
         return metaStorageMgr.invoke(resultingOperation).thenApply(unused -> 
null);
     }
 
-    /**
-     * Builds a list of cluster nodes based on a list of peers, pending and 
stable assignments.
-     * A peer will be added to the result list iff peer's consistent ID is 
present in pending or stable assignments.
-     *
-     * @param peers List of peers.
-     * @param pendingAssignments Byte array that contains serialized list of 
pending assignments.
-     * @param stableAssignments Byte array that contains serialized list of 
stable assignments.
-     * @return Resolved cluster nodes.
-     */
-    public static Set<ClusterNode> resolveClusterNodes(Collection<PeerId> 
peers, byte[] pendingAssignments, byte[] stableAssignments) {
-        Map<String, ClusterNode> resolveRegistry = new HashMap<>();
-
-        if (pendingAssignments != null) {
-            Set<ClusterNode> pending = ByteUtils.fromBytes(pendingAssignments);
-            pending.forEach(n -> resolveRegistry.put(n.name(), n));
-        }
-
-        if (stableAssignments != null) {
-            Set<ClusterNode> stable = ByteUtils.fromBytes(stableAssignments);
-            stable.forEach(n -> resolveRegistry.put(n.name(), n));
-        }
-
-        var resolvedNodes = new HashSet<ClusterNode>(capacity(peers.size()));
-
-        for (PeerId p : peers) {
-            ClusterNode resolvedNode = 
resolveRegistry.get(p.getConsistentId());
-
-            if (resolvedNode != null) {
-                resolvedNodes.add(resolvedNode);
-            } else {
-                throw new IgniteInternalException("Can't find appropriate 
cluster node for raft group peer: " + p);
-            }
-        }
-
-        return resolvedNodes;
-    }
-
-    /**
-     * Reads a list of cluster nodes from a MetaStorage entry.
-     *
-     * @param entry MetaStorage entry.
-     * @return List of cluster nodes.
-     */
-    public static Set<ClusterNode> readClusterNodes(Entry entry) {
-        if (entry.empty()) {
-            return Collections.emptySet();
-        }
-
-        return ByteUtils.fromBytes(entry.value());
-    }
-
     /**
      * Removes nodes from set of nodes.
      *
@@ -470,7 +413,7 @@ public class RebalanceUtil {
      * @param subtrahend Set of nodes to be removed.
      * @return Result of the subtraction.
      */
-    public static Set<ClusterNode> subtract(Set<ClusterNode> minuend, 
Set<ClusterNode> subtrahend) {
+    public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {
         return minuend.stream().filter(v -> 
!subtrahend.contains(v)).collect(Collectors.toSet());
     }
 
@@ -481,7 +424,7 @@ public class RebalanceUtil {
      * @param op2 Second operand.
      * @return Result of the addition.
      */
-    public static Set<ClusterNode> union(Set<ClusterNode> op1, 
Set<ClusterNode> op2) {
+    public static <T> Set<T> union(Set<T> op1, Set<T> op2) {
         var res = new HashSet<>(op1);
 
         res.addAll(op2);
@@ -496,7 +439,7 @@ public class RebalanceUtil {
      * @param op2 Second operand.
      * @return Result of the intersection.
      */
-    public static Set<ClusterNode> intersect(Set<ClusterNode> op1, 
Set<ClusterNode> op2) {
+    public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {
         return op1.stream().filter(op2::contains).collect(Collectors.toSet());
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 6d5487c3e8..03c8e3b2dd 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -55,6 +55,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import 
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListenerHolder;
@@ -91,7 +92,6 @@ import org.apache.ignite.internal.table.TableImpl;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -101,7 +101,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
@@ -163,13 +162,9 @@ public class TableManagerTest extends IgniteAbstractTest {
     private ReplicaManager replicaMgr;
 
     /** TX manager. */
-    @Mock(lenient = true)
+    @Mock
     private TxManager tm;
 
-    /** TX manager. */
-    @Mock(lenient = true)
-    private LockManager lm;
-
     /** Meta storage manager. */
     @Mock
     MetaStorageManager msm;
@@ -178,10 +173,6 @@ public class TableManagerTest extends IgniteAbstractTest {
     @Mock
     private MessagingService messagingService;
 
-    /** Mock cluster. */
-    @Mock
-    private ClusterService cluster;
-
     /**
      * Revision listener holder. It uses for the test configurations:
      * <ul>
@@ -222,8 +213,12 @@ public class TableManagerTest extends IgniteAbstractTest {
     /** Before all test scenarios. */
     @BeforeEach
     void before() {
-        when(rm.messagingService()).thenReturn(mock(MessagingService.class));
-        when(rm.topologyService()).thenReturn(mock(TopologyService.class));
+        when(rm.messagingService()).thenReturn(messagingService);
+
+        TopologyService topologyService = mock(TopologyService.class);
+
+        when(rm.topologyService()).thenReturn(topologyService);
+        when(topologyService.localMember()).thenReturn(node);
 
         revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
             function.apply(0L).join();
@@ -235,7 +230,7 @@ public class TableManagerTest extends IgniteAbstractTest {
             });
         };
 
-        when(msm.registerWatch(any(ByteArray.class), 
any())).thenReturn(CompletableFuture.completedFuture(1L));
+        when(msm.registerWatch(any(ByteArray.class), 
any())).thenReturn(completedFuture(1L));
 
         tblManagerFut = new CompletableFuture<>();
     }
@@ -260,8 +255,7 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     @Test
     public void testPreconfiguredTable() throws Exception {
-        when(rm.startRaftGroupService(any(), any())).thenAnswer(mock ->
-                
CompletableFuture.completedFuture(mock(RaftGroupService.class)));
+        when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> 
completedFuture(mock(RaftGroupService.class)));
 
         TableManager tableManager = createTableManager(tblManagerFut, false);
 
@@ -284,10 +278,10 @@ public class TableManagerTest extends IgniteAbstractTest {
 
                 var extConfCh = ((ExtendedTableChange) tableChange);
 
-                ArrayList<Set<ClusterNode>> assignment = new 
ArrayList<>(PARTITIONS);
+                var assignment = new ArrayList<Set<Assignment>>(PARTITIONS);
 
                 for (int part = 0; part < PARTITIONS; part++) {
-                    assignment.add(new HashSet<>(Collections.singleton(node)));
+                    assignment.add(new 
HashSet<>(Collections.singleton(Assignment.forPeer(node.name()))));
                 }
 
                 
extConfCh.changeAssignments(ByteUtils.toBytes(assignment)).changeSchemaId(1);
@@ -497,7 +491,7 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
 
-        verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any());
+        verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), 
any());
 
         TableManager tableManager = tblManagerFut.join();
 
@@ -630,7 +624,7 @@ public class TableManagerTest extends IgniteAbstractTest {
     ) throws Exception {
         String consistentId = "node0";
 
-        when(rm.startRaftGroupService(any(), any())).thenAnswer(mock -> {
+        when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> 
{
             RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
 
             when(raftGrpSrvcMock.leader()).thenReturn(new Peer(consistentId));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index dd9c1fa62c..f555919264 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -76,7 +76,6 @@ import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.service.LeaderWithTerm;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.hamcrest.CustomMatcher;
@@ -180,7 +179,6 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 CLOCK,
                 new PendingComparableValuesTracker<>(CLOCK.now()),
                 new TestTxStateStorage(),
-                mock(TopologyService.class),
                 mock(PlacementDriver.class),
                 peer -> true
         );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 06bb6b469d..ba78fde628 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -265,7 +265,6 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 clock,
                 safeTimeClock,
                 txStateStorage,
-                topologySrv,
                 placementDriver,
                 peer -> localNode.name().equals(peer.consistentId())
         );
@@ -304,7 +303,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
 
-        assertNull(tuple.leader());
+        assertNull(tuple.leaderName());
         assertNull(tuple.txMeta());
     }
 
@@ -326,7 +325,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         assertEquals(TxState.COMMITED, tuple.txMeta().txState());
         assertTrue(readTimestamp.compareTo(tuple.txMeta().commitTimestamp()) > 
0);
-        assertNull(tuple.leader());
+        assertNull(tuple.leaderName());
     }
 
     @Test
@@ -342,7 +341,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
 
         assertNull(tuple.txMeta());
-        assertNotNull(tuple.leader());
+        assertNotNull(tuple.leaderName());
     }
 
     @Test
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index f459ff6043..bf02ede3fc 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -252,7 +252,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 clock,
                 new PendingComparableValuesTracker<>(clock.now()),
                 txStateStorage().getOrCreateTxStateStorage(0),
-                null,
                 placementDriver,
                 peer -> true
         );


Reply via email to