This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f29716426 IGNITE-21382 Fixed flaky 
ItPrimaryReplicaChoiceTest.testPrimaryChangeLongHandling (#3495)
6f29716426 is described below

commit 6f297164260089910a08a297c6515713c191e383
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Apr 9 14:30:50 2024 +0300

    IGNITE-21382 Fixed flaky 
ItPrimaryReplicaChoiceTest.testPrimaryChangeLongHandling (#3495)
---
 .../ItPrimaryReplicaChoiceTest.java                |  22 +--
 .../MultiActorPlacementDriverTest.java             |  10 +-
 .../internal/placementdriver/LeaseUpdater.java     |  46 +++---
 .../internal/placementdriver/leases/Lease.java     |  34 ++++-
 .../negotiation/LeaseAgreement.java                |   9 ++
 .../placementdriver/PlacementDriverTest.java       |   6 +
 .../leases/LeaseSerializationTest.java             |  24 +++-
 .../ItPlacementDriverReplicaSideTest.java          |  26 ++--
 .../ignite/internal/replicator/ReplicaManager.java |  16 +--
 .../ignite/internal/table/ItDurableFinishTest.java |  16 +--
 .../apache/ignite/internal/table/NodeUtils.java    | 157 +++++++--------------
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |   2 +-
 .../table/ItTransactionPrimaryChangeTest.java      |   3 +-
 13 files changed, 194 insertions(+), 177 deletions(-)

diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 105180a18c..36af41713e 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -30,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscription;
@@ -65,7 +68,6 @@ import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
@@ -141,15 +143,14 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
             return falseCompletedFuture();
         });
 
-        NodeUtils.transferPrimary(tbl, null, this::node);
+        NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), 
tblReplicationGrp, null);
 
-        assertTrue(primaryChanged.get());
+        assertTrue(waitForCondition(primaryChanged::get, 10_000));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21382";)
     @Test
     public void testPrimaryChangeLongHandling() throws Exception {
-        TableViewInternal tbl = (TableViewInternal) 
node(0).tables().table(TABLE_NAME);
+        TableViewInternal tbl = 
unwrapTableImpl(node(0).tables().table(TABLE_NAME));
 
         var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
 
@@ -172,9 +173,12 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
         log.info("Primary replica is: " + primary);
 
-        NodeUtils.transferPrimary(tbl, null, this::node);
+        Collection<IgniteImpl> nodes = cluster.runningNodes().collect(toSet());
 
-        CompletableFuture<String> primaryChangeTask = 
IgniteTestUtils.runAsync(() -> NodeUtils.transferPrimary(tbl, primary, 
this::node));
+        NodeUtils.transferPrimary(nodes, tblReplicationGrp, null);
+
+        CompletableFuture<String> primaryChangeTask =
+                IgniteTestUtils.runAsync(() -> 
NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
 
         waitingForLeaderCache(tbl, primary);
 
@@ -250,7 +254,7 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
         
assertTrue(ignite.txManager().lockManager().locks(rwTx.id()).hasNext());
         assertEquals(6, partitionStorage.pendingCursors() + 
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
 
-        NodeUtils.transferPrimary(tbl, null, this::node);
+        NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), 
tblReplicationGrp, null);
 
         
assertTrue(ignite.txManager().lockManager().locks(rwTx.id()).hasNext());
         assertEquals(6, partitionStorage.pendingCursors() + 
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
@@ -378,7 +382,7 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
             Peer leader = raftSrvc.leader();
 
-            return leader != null && !leader.consistentId().equals(primary);
+            return leader != null;
         }, 10_000));
     }
 
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index 0823180251..03884d148d 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -427,14 +427,22 @@ public class MultiActorPlacementDriverTest extends 
BasePlacementDriverTest {
             }
         };
 
+        final Lease fLease = lease;
+        String proposedLeaseholder = nodeNames.stream().filter(n -> 
!n.equals(fLease.getLeaseholder())).findAny().orElseThrow();
+
         service.messagingService().send(
                 
clusterServices.get(activeActorRef.get()).topologyService().localMember(),
-                
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage().groupId(grpPart).build()
+                
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
+                        .groupId(grpPart)
+                        .redirectProposal(proposedLeaseholder)
+                        .build()
         );
 
         Lease leaseRenew = waitNewLeaseholder(grpPart, lease);
 
         log.info("Lease move from {} to {}", lease.getLeaseholder(), 
leaseRenew.getLeaseholder());
+
+        assertEquals(proposedLeaseholder, leaseRenew.getLeaseholder());
     }
 
     /**
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 75494fa762..bc85ce3b8f 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -212,10 +212,11 @@ public class LeaseUpdater {
      *
      * @param grpId Replication group id.
      * @param lease Lease to deny.
+     * @param redirectProposal Consistent id of the cluster node proposed for 
redirection.
      * @return Future completes true when the lease will not prolong in the 
future, false otherwise.
      */
-    private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId, 
Lease lease) {
-        Lease deniedLease = lease.denyLease();
+    private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId, 
Lease lease, String redirectProposal) {
+        Lease deniedLease = lease.denyLease(redirectProposal);
 
         leaseNegotiator.onLeaseRemoved(grpId);
 
@@ -357,7 +358,7 @@ public class LeaseUpdater {
                         publishLease(grpId, lease, renewedLeases);
 
                         continue;
-                    } else if (agreement.ready()) {
+                    } else if (agreement.isDeclined()) {
                         // Here we initiate negotiations for 
UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
                         ClusterNode candidate = nextLeaseHolder(assignments, 
agreement.getRedirectTo());
 
@@ -367,8 +368,12 @@ public class LeaseUpdater {
                             continue;
                         }
 
-                        // New lease is granting.
-                        writeNewLease(grpId, lease, candidate, renewedLeases, 
toBeNegotiated);
+                        // New lease is granted.
+                        writeNewLease(grpId, candidate, renewedLeases);
+
+                        boolean force = Objects.equals(lease.getLeaseholder(), 
candidate.name());
+
+                        toBeNegotiated.put(grpId, force);
 
                         continue;
                     }
@@ -376,7 +381,11 @@ public class LeaseUpdater {
 
                 // The lease is expired or close to this.
                 if (lease.getExpirationTime().getPhysical() < 
outdatedLeaseThreshold) {
-                    ClusterNode candidate = nextLeaseHolder(assignments, 
lease.isProlongable() ? lease.getLeaseholder() : null);
+                    String proposedLeaseholder = lease.isProlongable()
+                            ? lease.getLeaseholder()
+                            : lease.proposedCandidate();
+
+                    ClusterNode candidate = nextLeaseHolder(assignments, 
proposedLeaseholder);
 
                     if (candidate == null) {
                         leaseUpdateStatistics.onLeaseWithoutCandidate();
@@ -388,10 +397,14 @@ public class LeaseUpdater {
                     // so we must start a negotiation round from the 
beginning; the same we do for the groups that don't have
                     // leaseholders at all.
                     if (isLeaseOutdated(lease)) {
-                        // New lease is granting.
-                        writeNewLease(grpId, lease, candidate, renewedLeases, 
toBeNegotiated);
+                        // New lease is granted.
+                        writeNewLease(grpId, candidate, renewedLeases);
+
+                        boolean force = !lease.isProlongable() && 
lease.proposedCandidate() != null;
+
+                        toBeNegotiated.put(grpId, force);
                     } else if (lease.isProlongable() && 
candidate.id().equals(lease.getLeaseholderId())) {
-                        // Old lease is renewing.
+                        // Old lease is renewed.
                         prolongLease(grpId, lease, renewedLeases);
                     }
                 }
@@ -442,13 +455,14 @@ public class LeaseUpdater {
          * Writes a new lease.
          *
          * @param grpId Replication group id.
-         * @param lease Old lease to apply CAS in Meta storage.
          * @param candidate Lease candidate.
          * @param renewedLeases Leases to renew.
-         * @param toBeNegotiated Leases that are required to be negotiated.
          */
-        private void writeNewLease(ReplicationGroupId grpId, Lease lease, 
ClusterNode candidate,
-                Map<ReplicationGroupId, Lease> renewedLeases, 
Map<ReplicationGroupId, Boolean> toBeNegotiated) {
+        private void writeNewLease(
+                ReplicationGroupId grpId,
+                ClusterNode candidate,
+                Map<ReplicationGroupId, Lease> renewedLeases
+        ) {
             HybridTimestamp startTs = clockService.now();
 
             var expirationTs = new HybridTimestamp(startTs.getPhysical() + 
longLeaseInterval, 0);
@@ -457,8 +471,6 @@ public class LeaseUpdater {
 
             renewedLeases.put(grpId, renewedLease);
 
-            toBeNegotiated.put(grpId, !lease.isAccepted() && 
Objects.equals(lease.getLeaseholder(), candidate.name()));
-
             leaseUpdateStatistics.onLeaseCreate();
         }
 
@@ -592,7 +604,9 @@ public class LeaseUpdater {
 
             if (msg instanceof StopLeaseProlongationMessage) {
                 if (lease.isProlongable() && 
sender.equals(lease.getLeaseholder())) {
-                    denyLease(grpId, lease).whenComplete((res, th) -> {
+                    StopLeaseProlongationMessage stopLeaseProlongationMessage 
= (StopLeaseProlongationMessage) msg;
+
+                    denyLease(grpId, lease, 
stopLeaseProlongationMessage.redirectProposal()).whenComplete((res, th) -> {
                         if (th != null) {
                             LOG.warn("Prolongation denial failed due to 
exception [groupId={}]", th, grpId);
                         } else {
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
index 2a5ca15ca8..d613fa3e6d 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -60,6 +60,10 @@ public class Lease implements ReplicaMeta {
     /** The lease is available to prolong in the same leaseholder. */
     private final boolean prolongable;
 
+    /** The name of a node that is proposed to be a next leaseholder. This is 
not null in case when the lease is not prolongable. */
+    @Nullable
+    private final String proposedCandidate;
+
     /** ID of replication group. */
     private final ReplicationGroupId replicationGroupId;
 
@@ -79,7 +83,7 @@ public class Lease implements ReplicaMeta {
             HybridTimestamp leaseExpirationTime,
             ReplicationGroupId replicationGroupId
     ) {
-        this(leaseholder, leaseholderId, startTime, leaseExpirationTime, 
false, false, replicationGroupId);
+        this(leaseholder, leaseholderId, startTime, leaseExpirationTime, 
false, false, null, replicationGroupId);
     }
 
     /**
@@ -91,6 +95,8 @@ public class Lease implements ReplicaMeta {
      * @param leaseExpirationTime Lease expiration timestamp.
      * @param prolong Lease is available to prolong.
      * @param accepted The flag is {@code true} when the holder accepted the 
lease.
+     * @param proposedCandidate The name of a node that is proposed to be a 
next leaseholder. This is not null in case when the lease
+     *     is not prolongable.
      * @param replicationGroupId ID of replication group.
      */
     public Lease(
@@ -100,10 +106,13 @@ public class Lease implements ReplicaMeta {
             HybridTimestamp leaseExpirationTime,
             boolean prolong,
             boolean accepted,
+            @Nullable String proposedCandidate,
             ReplicationGroupId replicationGroupId
     ) {
         assert (leaseholder == null) == (leaseholderId == null) : 
"leaseholder=" + leaseholder + ", leaseholderId=" + leaseholderId;
 
+        assert (proposedCandidate == null || !prolong) : this;
+
         this.leaseholder = leaseholder;
         this.leaseholderId = leaseholderId;
         this.startTime = startTime;
@@ -111,6 +120,7 @@ public class Lease implements ReplicaMeta {
         this.prolongable = prolong;
         this.accepted = accepted;
         this.replicationGroupId = replicationGroupId;
+        this.proposedCandidate = proposedCandidate;
     }
 
     /**
@@ -123,7 +133,7 @@ public class Lease implements ReplicaMeta {
         assert accepted : "The lease should be accepted by leaseholder before 
prolongation: [lease=" + this + ", to=" + to + ']';
         assert prolongable : "The lease should be available to prolong: 
[lease=" + this + ", to=" + to + ']';
 
-        return new Lease(leaseholder, leaseholderId, startTime, to, true, 
true, replicationGroupId);
+        return new Lease(leaseholder, leaseholderId, startTime, to, true, 
true, null, replicationGroupId);
     }
 
     /**
@@ -135,7 +145,7 @@ public class Lease implements ReplicaMeta {
     public Lease acceptLease(HybridTimestamp to) {
         assert !accepted : "The lease is already accepted: " + this;
 
-        return new Lease(leaseholder, leaseholderId, startTime, to, true, 
true, replicationGroupId);
+        return new Lease(leaseholder, leaseholderId, startTime, to, true, 
true, null, replicationGroupId);
     }
 
     /**
@@ -143,10 +153,10 @@ public class Lease implements ReplicaMeta {
      *
      * @return Denied lease.
      */
-    public Lease denyLease() {
+    public Lease denyLease(String proposedCandidate) {
         assert accepted : "The lease is not accepted: " + this;
 
-        return new Lease(leaseholder, leaseholderId, startTime, 
expirationTime, false, true, replicationGroupId);
+        return new Lease(leaseholder, leaseholderId, startTime, 
expirationTime, false, true, proposedCandidate, replicationGroupId);
     }
 
     @Override
@@ -184,6 +194,12 @@ public class Lease implements ReplicaMeta {
         return replicationGroupId;
     }
 
+    /** The name of a node that is proposed to be a next leaseholder. This is 
not null in case when the lease is not prolongable. */
+    @Nullable
+    public String proposedCandidate() {
+        return proposedCandidate;
+    }
+
     /**
      * Encodes this lease into sequence of bytes.
      *
@@ -192,11 +208,13 @@ public class Lease implements ReplicaMeta {
     public byte[] bytes() {
         byte[] leaseholderBytes = stringToBytes(leaseholder);
         byte[] leaseholderIdBytes = stringToBytes(leaseholderId);
+        byte[] proposedCandidateBytes = stringToBytes(proposedCandidate);
         byte[] groupIdBytes = toBytes(replicationGroupId);
 
         int bufSize = 2 // accepted + prolongable
                 + HYBRID_TIMESTAMP_SIZE * 2 // startTime + expirationTime
-                + bytesSizeForWrite(leaseholderBytes) + 
bytesSizeForWrite(leaseholderIdBytes) + bytesSizeForWrite(groupIdBytes);
+                + bytesSizeForWrite(leaseholderBytes) + 
bytesSizeForWrite(leaseholderIdBytes) + 
bytesSizeForWrite(proposedCandidateBytes)
+                + bytesSizeForWrite(groupIdBytes);
 
         ByteBuffer buf = ByteBuffer.allocate(bufSize).order(LITTLE_ENDIAN);
 
@@ -208,6 +226,7 @@ public class Lease implements ReplicaMeta {
 
         putBytes(buf, leaseholderBytes);
         putBytes(buf, leaseholderIdBytes);
+        putBytes(buf, proposedCandidateBytes);
         putBytes(buf, groupIdBytes);
 
         return buf.array();
@@ -230,10 +249,11 @@ public class Lease implements ReplicaMeta {
 
         String leaseholder = stringFromBytes(getBytes(buf));
         String leaseholderId = stringFromBytes(getBytes(buf));
+        String proposedCandidate = stringFromBytes(getBytes(buf));
 
         ReplicationGroupId groupId = ByteUtils.fromBytes(getBytes(buf));
 
-        return new Lease(leaseholder, leaseholderId, startTime, 
expirationTime, prolongable, accepted, groupId);
+        return new Lease(leaseholder, leaseholderId, startTime, 
expirationTime, prolongable, accepted, proposedCandidate, groupId);
     }
 
     /**
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
index 61fc719798..fa1fce72e9 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -92,6 +92,15 @@ public class LeaseAgreement {
         return false;
     }
 
+    /**
+     * Whether the agreement is declined (ready but not accepted).
+     *
+     * @return Whether the agreement is declined (ready but not accepted).
+     */
+    public boolean isDeclined() {
+        return ready() && !isAccepted();
+    }
+
     /**
      * The property matches to {@link 
LeaseGrantedMessageResponse#redirectProposal()}.
      * This property is available only when the agreement is ready (look at 
{@link #ready()}).
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 46a17c3926..6c0ec8d9a0 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -96,6 +96,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
             new HybridTimestamp(5_000, 0),
             false,
             true,
+            null,
             GROUP_1
     );
 
@@ -106,6 +107,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
             new HybridTimestamp(15_000, 0),
             false,
             true,
+            null,
             GROUP_1
     );
 
@@ -116,6 +118,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
             new HybridTimestamp(30_000, 0),
             false,
             true,
+            null,
             GROUP_1
     );
 
@@ -307,6 +310,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
                 new HybridTimestamp(leaseDurationMilliseconds, 0),
                 false,
                 true,
+                null,
                 GROUP_1
         );
 
@@ -328,6 +332,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
                 new 
HybridTimestamp(newLeaseholderPhysicalStartTimeMilliseconds + 
leaseDurationMilliseconds, 0),
                 false,
                 true,
+                null,
                 GROUP_1
         );
 
@@ -623,6 +628,7 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
                 new HybridTimestamp(15_000, 0),
                 false,
                 true,
+                null,
                 new TablePartitionId(groupId.tableId() + 1, 
groupId.partitionId() + 1)
         );
 
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
index c2df7905cf..43f0bee08b 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
@@ -37,17 +37,17 @@ public class LeaseSerializationTest {
 
         checksSerialization(Lease.emptyLease(groupId));
 
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), true, true, groupId));
+        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), true, true, null, groupId));
 
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), false, false, groupId));
+        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), false, false, "node2", groupId));
 
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), false, true, groupId));
+        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), false, true, "node2", groupId));
 
-        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), true, false, groupId));
+        checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now 
+ 1_000_000, 100), true, false, null, groupId));
 
-        checksSerialization(newLease(null, timestamp(1, 1), timestamp(2 + 
1_000_000, 100), true, true, groupId));
+        checksSerialization(newLease(null, timestamp(1, 1), timestamp(2 + 
1_000_000, 100), true, true, null, groupId));
 
-        checksSerialization(newLease("node" + new String(new byte[1000]), 
timestamp(1, 1), timestamp(2, 100), false, false, groupId));
+        checksSerialization(newLease("node" + new String(new byte[1000]), 
timestamp(1, 1), timestamp(2, 100), false, false, null, groupId));
     }
 
     @Test
@@ -57,7 +57,15 @@ public class LeaseSerializationTest {
         ReplicationGroupId groupId = new TablePartitionId(1, 1);
 
         for (int i = 0; i < 25; i++) {
-            leases.add(newLease("node" + i, timestamp(1, i), timestamp(1, i + 
1), i % 2 == 0, i % 2 == 1, groupId));
+            leases.add(newLease(
+                    "node" + i,
+                    timestamp(1, i),
+                    timestamp(1, i + 1),
+                    i % 2 == 0,
+                    i % 2 == 1,
+                    i % 2 == 0 ? null : "node" + i,
+                    groupId
+            ));
         }
 
         byte[] leaseBatchBytes = new LeaseBatch(leases).bytes();
@@ -75,6 +83,7 @@ public class LeaseSerializationTest {
             HybridTimestamp expirationTime,
             boolean prolong,
             boolean accepted,
+            @Nullable String proposedCandidate,
             ReplicationGroupId replicationGroupId
     ) {
         return new Lease(
@@ -84,6 +93,7 @@ public class LeaseSerializationTest {
                 expirationTime,
                 prolong,
                 accepted,
+                proposedCandidate,
                 replicationGroupId
         );
     }
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index fddbd55a5d..bc86533795 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.replicator;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -31,6 +32,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -75,6 +78,7 @@ import 
org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageTestGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.TestReplicaMessagesFactory;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -129,6 +133,8 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
     /** List of services to have to close before the test will be completed. */
     private final List<Closeable> servicesToClose = new ArrayList<>();
 
+    private BiFunction<ReplicaRequest, String, 
CompletableFuture<ReplicaResult>> replicaListener = null;
+
     @BeforeEach
     public void beforeTest(TestInfo testInfo) {
         partitionOperationsExecutor = new ThreadPoolExecutor(
@@ -212,6 +218,8 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
     @AfterEach
     public void afterTest() throws Exception {
         IgniteUtils.closeAll(servicesToClose);
+
+        replicaListener = null;
     }
 
     /**
@@ -263,7 +271,7 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
     }
 
     @Test
-    public void testNotificationToPlacementDriverAboutChangeLeader() throws 
Exception {
+    public void testNotificationToPlacementDriverAboutConnectivityProblem() 
throws Exception {
         Set<String> grpNodes = chooseRandomNodes(3);
 
         log.info("Replication group is based on {}", grpNodes);
@@ -276,24 +284,20 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
 
         var leaderNodeName = raftClient.leader().consistentId();
 
-        var newLeaderNodeName = grpNodes.stream().filter(n -> 
!n.equals(leaderNodeName)).findAny().get();
-
-        log.info("Leader is moving form {} to {}", leaderNodeName, 
newLeaderNodeName);
-
         ConcurrentHashMap<String, String> nodesToReceivedDeclineMsg = new 
ConcurrentHashMap<>();
 
         denyLeaseHandler = (msg, from, to) -> {
             nodesToReceivedDeclineMsg.put(to, from);
         };
 
-        raftClient.transferLeadership(new Peer(newLeaderNodeName)).get();
-
         var anyNode = randomNode(Set.of());
 
         log.info("Message sent from {} to {}", anyNode, leaderNodeName);
 
         var clusterService = clusterServices.get(anyNode);
 
+        replicaListener = (request, sender) -> failedFuture(new 
IOException("test"));
+
         new ReplicaService(
                 clusterService.messagingService(),
                 clock,
@@ -483,7 +487,13 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                                 log.info("Handle request [type={}]", 
request.getClass().getSimpleName());
 
                                 return 
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
-                                        .thenApply(ignored -> new 
ReplicaResult(null, null));
+                                        .thenCompose(ignored -> {
+                                            if (replicaListener == null) {
+                                                return completedFuture(new 
ReplicaResult(null, null));
+                                            } else {
+                                                return 
replicaListener.apply(request, senderId);
+                                            }
+                                        });
                             },
                             raftClient,
                             new 
PendingComparableValuesTracker<>(Long.MAX_VALUE));
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 411cc501ac..182e00537b 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -263,9 +263,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 return true;
             }
 
-            // It's something else: either a JRE thread or an Ignite thread 
not marked with ThreadAttributes. In any case,
-            // let it proceed. If it touches a storage, we'll see an assertion.
-            return false;
+            // It's something else: either a JRE thread or an Ignite thread 
not marked with ThreadAttributes. As we are not sure,
+            // let's switch: false negative can produce assertion errors.
+            return true;
         }
     }
 
@@ -351,14 +351,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
                 clusterNetSvc.messagingService().respond(senderConsistentId, 
msg, correlationId);
 
-                if (request instanceof PrimaryReplicaRequest) {
-                    ClusterNode localNode = 
clusterNetSvc.topologyService().localMember();
-
-                    if (!localNode.name().equals(replica.proposedPrimary())) {
-                        stopLeaseProlongation(request.groupId(), 
replica.proposedPrimary());
-                    } else if (isConnectivityRelatedException(ex)) {
-                        stopLeaseProlongation(request.groupId(), null);
-                    }
+                if (request instanceof PrimaryReplicaRequest && 
isConnectivityRelatedException(ex)) {
+                    stopLeaseProlongation(request.groupId(), null);
                 }
 
                 if (ex == null && res.replicationFuture() != null) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 429af61f43..119a1ab3d1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.table;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
-import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -146,7 +146,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
 
         // Drop all finish messages to the old primary, pick a new one.
         // The coordinator will get a response from the new primary.
-        CompletableFuture<Void> transferPrimaryFuture = 
changePrimaryOnFinish(context.coordinatorNode, context.publicTable);
+        CompletableFuture<Void> transferPrimaryFuture = 
changePrimaryOnFinish(context.coordinatorNode);
 
         // The primary is changed after calculating the outcome and commit 
timestamp.
         // The new primary successfully commits such transaction.
@@ -155,8 +155,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         assertThat(transferPrimaryFuture, willCompleteSuccessfully());
     }
 
-    private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl 
coordinatorNode, Table publicTable) {
-        TableImpl tableImpl = unwrapTableImpl(publicTable);
+    private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl 
coordinatorNode) {
         DefaultMessagingService coordinatorMessaging = 
messaging(coordinatorNode);
 
         AtomicBoolean dropMessage = new AtomicBoolean(true);
@@ -186,7 +185,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
 
                 logger().info("Start transferring primary.");
 
-                NodeUtils.transferPrimary(tableImpl, null, this::node);
+                
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), 
defaultTablePartitionId(node(0)), null);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             } finally {
@@ -248,15 +247,14 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         Context context = prepareTransactionData();
 
         // The transaction is committed but the primary expires right before 
applying the cleanup message.
-        CompletableFuture<Void> transferPrimaryFuture = 
changePrimaryOnCleanup(context.primaryNode, context.publicTable);
+        CompletableFuture<Void> transferPrimaryFuture = 
changePrimaryOnCleanup(context.primaryNode);
 
         commitAndValidate(context.tx, context.publicTable, context.keyTpl);
 
         assertThat(transferPrimaryFuture, willCompleteSuccessfully());
     }
 
-    private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl 
primaryNode, Table publicTable) {
-        TableImpl tableImpl = unwrapTableImpl(publicTable);
+    private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl 
primaryNode) {
         DefaultMessagingService primaryMessaging = messaging(primaryNode);
 
         AtomicBoolean dropMessage = new AtomicBoolean(true);
@@ -286,7 +284,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
 
                 logger().info("Start transferring primary.");
 
-                NodeUtils.transferPrimary(tableImpl, null, this::node);
+                
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), 
defaultTablePartitionId(node(0)), null);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             } finally {
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
index 4494563f3a..6dfb2ff26f 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
@@ -20,25 +20,18 @@ package org.apache.ignite.internal.table;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.IntFunction;
-import java.util.stream.IntStream;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import 
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -54,117 +47,67 @@ public class NodeUtils {
     /**
      * Transfers the primary rights to another node.
      *
-     * @param tbl Table.
+     * @param nodes Nodes collection.
+     * @param groupId Group id.
      * @param preferablePrimary Primary replica name which is preferred for 
being primary or {@code null}.
-     * @return Future which points to a new primary replica name.
+     * @return New primary replica name.
      * @throws InterruptedException If failed.
      */
-    // TODO: IGNITE-20365: Replace this method when proper primary change 
method is implemented.
-    public static String transferPrimary(TableViewInternal tbl, @Nullable 
String preferablePrimary, IntFunction<IgniteImpl> nodes)
-            throws InterruptedException {
-        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
-
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
nodes.apply(0).placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                nodes.apply(0).clock().now(),
-                AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        String primary = primaryReplicaFut.join().getLeaseholder();
-
-        if (preferablePrimary != null && preferablePrimary.equals(primary)) {
-            return primary;
+    public static String transferPrimary(
+            Collection<IgniteImpl> nodes,
+            ReplicationGroupId groupId,
+            @Nullable String preferablePrimary
+    ) throws InterruptedException {
+        LOG.info("Moving the primary replica [preferablePrimary=" + 
preferablePrimary + "].");
+
+        IgniteImpl node = nodes.stream().findAny().orElseThrow();
+
+        ReplicaMeta currentLeaseholder = leaseholder(node, groupId);
+
+        IgniteImpl leaseholderNode = nodes.stream()
+                .filter(n -> 
n.id().equals(currentLeaseholder.getLeaseholderId()))
+                .findFirst().orElseThrow();
+
+        if (preferablePrimary == null) {
+            preferablePrimary = nodes.stream()
+                    .map(IgniteImpl::name)
+                    .filter(n -> n.equals(currentLeaseholder.getLeaseholder()))
+                    .findFirst()
+                    .orElseThrow();
         }
 
-        // Change leader for the replication group.
-
-        RaftGroupService raftSrvc = 
tbl.internalTable().tableRaftService().partitionRaftGroupService(0);
-
-        raftSrvc.refreshLeader();
+        String finalPreferablePrimary = preferablePrimary;
 
-        Peer leader = raftSrvc.leader();
+        StopLeaseProlongationMessage msg = 
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
+                .groupId(groupId)
+                .redirectProposal(preferablePrimary)
+                .build();
 
-        Peer newLeader = null;
-
-        if (preferablePrimary != null) {
-            for (Peer peer : raftSrvc.peers()) {
-                if (peer.consistentId().equals(preferablePrimary)) {
-                    newLeader = peer;
-                }
-            }
-        }
-
-        if (newLeader == null) {
-            for (Peer peer : raftSrvc.peers()) {
-                if (!leader.equals(peer)) {
-                    newLeader = peer;
-                }
-            }
-        }
+        nodes.forEach(
+                n -> 
leaseholderNode.clusterService().messagingService().send(n.clusterService().topologyService().localMember(),
 msg)
+        );
 
-        assertNotNull(newLeader);
+        assertTrue(waitForCondition(() -> {
+            ReplicaMeta newPrimaryReplica = leaseholder(node, groupId);
 
-        assertThat(raftSrvc.transferLeadership(newLeader), 
willCompleteSuccessfully());
+            return 
newPrimaryReplica.getLeaseholder().equals(finalPreferablePrimary);
+        }, 10_000));
 
-        LOG.info("Leader moved [from={}, to={}]", leader, newLeader);
+        LOG.info("Primary replica moved successfully.");
 
-        // Leader changed.
+        return finalPreferablePrimary;
+    }
 
-        AtomicReference<String> newLeaseholder = new AtomicReference<>();
+    private static ReplicaMeta leaseholder(IgniteImpl node, ReplicationGroupId 
groupId) {
+        CompletableFuture<ReplicaMeta> leaseholderFuture = 
node.placementDriver().awaitPrimaryReplica(
+                groupId,
+                node.clock().now(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
+        );
 
-        AtomicLong lastInsertAttempt = new AtomicLong();
+        assertThat(leaseholderFuture, willCompleteSuccessfully());
 
-        assertTrue(waitForCondition(() -> {
-            CompletableFuture<ReplicaMeta> newPrimaryReplicaFut = 
nodes.apply(0).placementDriver().awaitPrimaryReplica(
-                    tblReplicationGrp,
-                    nodes.apply(0).clock().now(),
-                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                    SECONDS
-            );
-
-            assertThat(newPrimaryReplicaFut, willCompleteSuccessfully());
-
-            if (!primary.equals(newPrimaryReplicaFut.join().getLeaseholder())) 
{
-                
newLeaseholder.set(newPrimaryReplicaFut.join().getLeaseholder());
-
-                return true;
-            } else {
-                // Insert is needed to notify the placement driver about a 
leader for the group was changed.
-                try {
-                    long lastTs = lastInsertAttempt.get();
-
-                    if (coarseCurrentTimeMillis() - lastTs > 1_000 && 
lastInsertAttempt.compareAndSet(lastTs, coarseCurrentTimeMillis())) {
-                        int nodeCount = nodes.apply(0).clusterNodes().size();
-
-                        IgniteImpl primaryNode = IntStream.range(0, nodeCount)
-                                .mapToObj(i -> nodes.apply(i))
-                                .filter(n -> n.name().equals(primary))
-                                .findAny().orElseThrow();
-
-                        StopLeaseProlongationMessage msg = 
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
-                                .groupId(tblReplicationGrp)
-                                .build();
-
-                        IntStream.range(0, nodeCount)
-                                .mapToObj(i -> nodes.apply(i))
-                                .forEach(n -> 
primaryNode.clusterService().messagingService().send(
-                                        
n.clusterService().topologyService().localMember(),
-                                        msg
-                                ));
-                    }
-                } catch (Exception e) {
-                    LOG.error("Failed to perform insert", e);
-                }
-
-                return false;
-            }
-        }, 60_000));
-
-        LOG.info("Primary replica moved [from={}, to={}]", primary, 
newLeaseholder.get());
-
-        return newLeaseholder.get();
+        return leaseholderFuture.join();
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 391616b25e..cabe6f8bec 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -309,7 +309,7 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
         RaftGroupService raftGroupService = 
cluster.leaderServiceFor(tablePartitionId);
 
         assertThat(
-                "Unexpected leadership change",
+                "Unexpected leadership change on group: " + tablePartitionId,
                 raftGroupService.getServerId().getConsistentId(), 
is(cluster.node(expectedLeaderNodeIndex).name())
         );
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
index 29aeab0ec8..cf21fb76eb 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
@@ -162,7 +163,7 @@ public class ItTransactionPrimaryChangeTest extends 
ClusterPerTestIntegrationTes
             assertThat(fullTxReplicationAttemptFuture, 
willCompleteSuccessfully());
 
             // Changing the primary.
-            NodeUtils.transferPrimary(tbl, txCrdNode.name(), cluster::node);
+            
NodeUtils.transferPrimary(cluster.runningNodes().collect(toList()), 
tblReplicationGrp, txCrdNode.name());
 
             // Start a regular transaction that increments the value. It 
should see the initially inserted value and its commit should
             // succeed.

Reply via email to