This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6f29716426 IGNITE-21382 Fixed flaky
ItPrimaryReplicaChoiceTest.testPrimaryChangeLongHandling (#3495)
6f29716426 is described below
commit 6f297164260089910a08a297c6515713c191e383
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Apr 9 14:30:50 2024 +0300
IGNITE-21382 Fixed flaky
ItPrimaryReplicaChoiceTest.testPrimaryChangeLongHandling (#3495)
---
.../ItPrimaryReplicaChoiceTest.java | 22 +--
.../MultiActorPlacementDriverTest.java | 10 +-
.../internal/placementdriver/LeaseUpdater.java | 46 +++---
.../internal/placementdriver/leases/Lease.java | 34 ++++-
.../negotiation/LeaseAgreement.java | 9 ++
.../placementdriver/PlacementDriverTest.java | 6 +
.../leases/LeaseSerializationTest.java | 24 +++-
.../ItPlacementDriverReplicaSideTest.java | 26 ++--
.../ignite/internal/replicator/ReplicaManager.java | 16 +--
.../ignite/internal/table/ItDurableFinishTest.java | 16 +--
.../apache/ignite/internal/table/NodeUtils.java | 157 +++++++--------------
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 2 +-
.../table/ItTransactionPrimaryChangeTest.java | 3 +-
13 files changed, 194 insertions(+), 177 deletions(-)
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 105180a18c..36af41713e 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -30,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscription;
@@ -65,7 +68,6 @@ import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -141,15 +143,14 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
return falseCompletedFuture();
});
- NodeUtils.transferPrimary(tbl, null, this::node);
+ NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()),
tblReplicationGrp, null);
- assertTrue(primaryChanged.get());
+ assertTrue(waitForCondition(primaryChanged::get, 10_000));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21382")
@Test
public void testPrimaryChangeLongHandling() throws Exception {
- TableViewInternal tbl = (TableViewInternal)
node(0).tables().table(TABLE_NAME);
+ TableViewInternal tbl =
unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
@@ -172,9 +173,12 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
log.info("Primary replica is: " + primary);
- NodeUtils.transferPrimary(tbl, null, this::node);
+ Collection<IgniteImpl> nodes = cluster.runningNodes().collect(toSet());
- CompletableFuture<String> primaryChangeTask =
IgniteTestUtils.runAsync(() -> NodeUtils.transferPrimary(tbl, primary,
this::node));
+ NodeUtils.transferPrimary(nodes, tblReplicationGrp, null);
+
+ CompletableFuture<String> primaryChangeTask =
+ IgniteTestUtils.runAsync(() ->
NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
waitingForLeaderCache(tbl, primary);
@@ -250,7 +254,7 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
assertTrue(ignite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() +
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
- NodeUtils.transferPrimary(tbl, null, this::node);
+ NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()),
tblReplicationGrp, null);
assertTrue(ignite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() +
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
@@ -378,7 +382,7 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
Peer leader = raftSrvc.leader();
- return leader != null && !leader.consistentId().equals(primary);
+ return leader != null;
}, 10_000));
}
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index 0823180251..03884d148d 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -427,14 +427,22 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
}
};
+ final Lease fLease = lease;
+ String proposedLeaseholder = nodeNames.stream().filter(n ->
!n.equals(fLease.getLeaseholder())).findAny().orElseThrow();
+
service.messagingService().send(
clusterServices.get(activeActorRef.get()).topologyService().localMember(),
-
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage().groupId(grpPart).build()
+
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
+ .groupId(grpPart)
+ .redirectProposal(proposedLeaseholder)
+ .build()
);
Lease leaseRenew = waitNewLeaseholder(grpPart, lease);
log.info("Lease move from {} to {}", lease.getLeaseholder(),
leaseRenew.getLeaseholder());
+
+ assertEquals(proposedLeaseholder, leaseRenew.getLeaseholder());
}
/**
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 75494fa762..bc85ce3b8f 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -212,10 +212,11 @@ public class LeaseUpdater {
*
* @param grpId Replication group id.
* @param lease Lease to deny.
+ * @param redirectProposal Consistent id of the cluster node proposed for
redirection.
* @return Future completes true when the lease will not prolong in the
future, false otherwise.
*/
- private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId,
Lease lease) {
- Lease deniedLease = lease.denyLease();
+ private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId,
Lease lease, String redirectProposal) {
+ Lease deniedLease = lease.denyLease(redirectProposal);
leaseNegotiator.onLeaseRemoved(grpId);
@@ -357,7 +358,7 @@ public class LeaseUpdater {
publishLease(grpId, lease, renewedLeases);
continue;
- } else if (agreement.ready()) {
+ } else if (agreement.isDeclined()) {
// Here we initiate negotiations for
UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
ClusterNode candidate = nextLeaseHolder(assignments,
agreement.getRedirectTo());
@@ -367,8 +368,12 @@ public class LeaseUpdater {
continue;
}
- // New lease is granting.
- writeNewLease(grpId, lease, candidate, renewedLeases,
toBeNegotiated);
+ // New lease is granted.
+ writeNewLease(grpId, candidate, renewedLeases);
+
+ boolean force = Objects.equals(lease.getLeaseholder(),
candidate.name());
+
+ toBeNegotiated.put(grpId, force);
continue;
}
@@ -376,7 +381,11 @@ public class LeaseUpdater {
// The lease is expired or close to this.
if (lease.getExpirationTime().getPhysical() <
outdatedLeaseThreshold) {
- ClusterNode candidate = nextLeaseHolder(assignments,
lease.isProlongable() ? lease.getLeaseholder() : null);
+ String proposedLeaseholder = lease.isProlongable()
+ ? lease.getLeaseholder()
+ : lease.proposedCandidate();
+
+ ClusterNode candidate = nextLeaseHolder(assignments,
proposedLeaseholder);
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
@@ -388,10 +397,14 @@ public class LeaseUpdater {
// so we must start a negotiation round from the
beginning; the same we do for the groups that don't have
// leaseholders at all.
if (isLeaseOutdated(lease)) {
- // New lease is granting.
- writeNewLease(grpId, lease, candidate, renewedLeases,
toBeNegotiated);
+ // New lease is granted.
+ writeNewLease(grpId, candidate, renewedLeases);
+
+ boolean force = !lease.isProlongable() &&
lease.proposedCandidate() != null;
+
+ toBeNegotiated.put(grpId, force);
} else if (lease.isProlongable() &&
candidate.id().equals(lease.getLeaseholderId())) {
- // Old lease is renewing.
+ // Old lease is renewed.
prolongLease(grpId, lease, renewedLeases);
}
}
@@ -442,13 +455,14 @@ public class LeaseUpdater {
* Writes a new lease.
*
* @param grpId Replication group id.
- * @param lease Old lease to apply CAS in Meta storage.
* @param candidate Lease candidate.
* @param renewedLeases Leases to renew.
- * @param toBeNegotiated Leases that are required to be negotiated.
*/
- private void writeNewLease(ReplicationGroupId grpId, Lease lease,
ClusterNode candidate,
- Map<ReplicationGroupId, Lease> renewedLeases,
Map<ReplicationGroupId, Boolean> toBeNegotiated) {
+ private void writeNewLease(
+ ReplicationGroupId grpId,
+ ClusterNode candidate,
+ Map<ReplicationGroupId, Lease> renewedLeases
+ ) {
HybridTimestamp startTs = clockService.now();
var expirationTs = new HybridTimestamp(startTs.getPhysical() +
longLeaseInterval, 0);
@@ -457,8 +471,6 @@ public class LeaseUpdater {
renewedLeases.put(grpId, renewedLease);
- toBeNegotiated.put(grpId, !lease.isAccepted() &&
Objects.equals(lease.getLeaseholder(), candidate.name()));
-
leaseUpdateStatistics.onLeaseCreate();
}
@@ -592,7 +604,9 @@ public class LeaseUpdater {
if (msg instanceof StopLeaseProlongationMessage) {
if (lease.isProlongable() &&
sender.equals(lease.getLeaseholder())) {
- denyLease(grpId, lease).whenComplete((res, th) -> {
+ StopLeaseProlongationMessage stopLeaseProlongationMessage
= (StopLeaseProlongationMessage) msg;
+
+ denyLease(grpId, lease,
stopLeaseProlongationMessage.redirectProposal()).whenComplete((res, th) -> {
if (th != null) {
LOG.warn("Prolongation denial failed due to
exception [groupId={}]", th, grpId);
} else {
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
index 2a5ca15ca8..d613fa3e6d 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -60,6 +60,10 @@ public class Lease implements ReplicaMeta {
/** The lease is available to prolong in the same leaseholder. */
private final boolean prolongable;
+ /** The name of a node that is proposed to be a next leaseholder. This is
not null in case when the lease is not prolongable. */
+ @Nullable
+ private final String proposedCandidate;
+
/** ID of replication group. */
private final ReplicationGroupId replicationGroupId;
@@ -79,7 +83,7 @@ public class Lease implements ReplicaMeta {
HybridTimestamp leaseExpirationTime,
ReplicationGroupId replicationGroupId
) {
- this(leaseholder, leaseholderId, startTime, leaseExpirationTime,
false, false, replicationGroupId);
+ this(leaseholder, leaseholderId, startTime, leaseExpirationTime,
false, false, null, replicationGroupId);
}
/**
@@ -91,6 +95,8 @@ public class Lease implements ReplicaMeta {
* @param leaseExpirationTime Lease expiration timestamp.
* @param prolong Lease is available to prolong.
* @param accepted The flag is {@code true} when the holder accepted the
lease.
+ * @param proposedCandidate The name of a node that is proposed to be a
next leaseholder. This is not null in case when the lease
+ * is not prolongable.
* @param replicationGroupId ID of replication group.
*/
public Lease(
@@ -100,10 +106,13 @@ public class Lease implements ReplicaMeta {
HybridTimestamp leaseExpirationTime,
boolean prolong,
boolean accepted,
+ @Nullable String proposedCandidate,
ReplicationGroupId replicationGroupId
) {
assert (leaseholder == null) == (leaseholderId == null) :
"leaseholder=" + leaseholder + ", leaseholderId=" + leaseholderId;
+ assert (proposedCandidate == null || !prolong) : this;
+
this.leaseholder = leaseholder;
this.leaseholderId = leaseholderId;
this.startTime = startTime;
@@ -111,6 +120,7 @@ public class Lease implements ReplicaMeta {
this.prolongable = prolong;
this.accepted = accepted;
this.replicationGroupId = replicationGroupId;
+ this.proposedCandidate = proposedCandidate;
}
/**
@@ -123,7 +133,7 @@ public class Lease implements ReplicaMeta {
assert accepted : "The lease should be accepted by leaseholder before
prolongation: [lease=" + this + ", to=" + to + ']';
assert prolongable : "The lease should be available to prolong:
[lease=" + this + ", to=" + to + ']';
- return new Lease(leaseholder, leaseholderId, startTime, to, true,
true, replicationGroupId);
+ return new Lease(leaseholder, leaseholderId, startTime, to, true,
true, null, replicationGroupId);
}
/**
@@ -135,7 +145,7 @@ public class Lease implements ReplicaMeta {
public Lease acceptLease(HybridTimestamp to) {
assert !accepted : "The lease is already accepted: " + this;
- return new Lease(leaseholder, leaseholderId, startTime, to, true,
true, replicationGroupId);
+ return new Lease(leaseholder, leaseholderId, startTime, to, true,
true, null, replicationGroupId);
}
/**
@@ -143,10 +153,10 @@ public class Lease implements ReplicaMeta {
*
* @return Denied lease.
*/
- public Lease denyLease() {
+ public Lease denyLease(String proposedCandidate) {
assert accepted : "The lease is not accepted: " + this;
- return new Lease(leaseholder, leaseholderId, startTime,
expirationTime, false, true, replicationGroupId);
+ return new Lease(leaseholder, leaseholderId, startTime,
expirationTime, false, true, proposedCandidate, replicationGroupId);
}
@Override
@@ -184,6 +194,12 @@ public class Lease implements ReplicaMeta {
return replicationGroupId;
}
+ /** The name of a node that is proposed to be a next leaseholder. This is
not null in case when the lease is not prolongable. */
+ @Nullable
+ public String proposedCandidate() {
+ return proposedCandidate;
+ }
+
/**
* Encodes this lease into sequence of bytes.
*
@@ -192,11 +208,13 @@ public class Lease implements ReplicaMeta {
public byte[] bytes() {
byte[] leaseholderBytes = stringToBytes(leaseholder);
byte[] leaseholderIdBytes = stringToBytes(leaseholderId);
+ byte[] proposedCandidateBytes = stringToBytes(proposedCandidate);
byte[] groupIdBytes = toBytes(replicationGroupId);
int bufSize = 2 // accepted + prolongable
+ HYBRID_TIMESTAMP_SIZE * 2 // startTime + expirationTime
- + bytesSizeForWrite(leaseholderBytes) +
bytesSizeForWrite(leaseholderIdBytes) + bytesSizeForWrite(groupIdBytes);
+ + bytesSizeForWrite(leaseholderBytes) +
bytesSizeForWrite(leaseholderIdBytes) +
bytesSizeForWrite(proposedCandidateBytes)
+ + bytesSizeForWrite(groupIdBytes);
ByteBuffer buf = ByteBuffer.allocate(bufSize).order(LITTLE_ENDIAN);
@@ -208,6 +226,7 @@ public class Lease implements ReplicaMeta {
putBytes(buf, leaseholderBytes);
putBytes(buf, leaseholderIdBytes);
+ putBytes(buf, proposedCandidateBytes);
putBytes(buf, groupIdBytes);
return buf.array();
@@ -230,10 +249,11 @@ public class Lease implements ReplicaMeta {
String leaseholder = stringFromBytes(getBytes(buf));
String leaseholderId = stringFromBytes(getBytes(buf));
+ String proposedCandidate = stringFromBytes(getBytes(buf));
ReplicationGroupId groupId = ByteUtils.fromBytes(getBytes(buf));
- return new Lease(leaseholder, leaseholderId, startTime,
expirationTime, prolongable, accepted, groupId);
+ return new Lease(leaseholder, leaseholderId, startTime,
expirationTime, prolongable, accepted, proposedCandidate, groupId);
}
/**
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
index 61fc719798..fa1fce72e9 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -92,6 +92,15 @@ public class LeaseAgreement {
return false;
}
+ /**
+ * Whether the agreement is declined (ready but not accepted).
+ *
+ * @return Whether the agreement is declined (ready but not accepted).
+ */
+ public boolean isDeclined() {
+ return ready() && !isAccepted();
+ }
+
/**
* The property matches to {@link
LeaseGrantedMessageResponse#redirectProposal()}.
* This property is available only when the agreement is ready (look at
{@link #ready()}).
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 46a17c3926..6c0ec8d9a0 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -96,6 +96,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
new HybridTimestamp(5_000, 0),
false,
true,
+ null,
GROUP_1
);
@@ -106,6 +107,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
new HybridTimestamp(15_000, 0),
false,
true,
+ null,
GROUP_1
);
@@ -116,6 +118,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
new HybridTimestamp(30_000, 0),
false,
true,
+ null,
GROUP_1
);
@@ -307,6 +310,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
new HybridTimestamp(leaseDurationMilliseconds, 0),
false,
true,
+ null,
GROUP_1
);
@@ -328,6 +332,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
new
HybridTimestamp(newLeaseholderPhysicalStartTimeMilliseconds +
leaseDurationMilliseconds, 0),
false,
true,
+ null,
GROUP_1
);
@@ -623,6 +628,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
new HybridTimestamp(15_000, 0),
false,
true,
+ null,
new TablePartitionId(groupId.tableId() + 1,
groupId.partitionId() + 1)
);
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
index c2df7905cf..43f0bee08b 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseSerializationTest.java
@@ -37,17 +37,17 @@ public class LeaseSerializationTest {
checksSerialization(Lease.emptyLease(groupId));
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, true, groupId));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, true, null, groupId));
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, false, groupId));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, false, "node2", groupId));
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, true, groupId));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), false, true, "node2", groupId));
- checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, false, groupId));
+ checksSerialization(newLease("node1", timestamp(now, 1), timestamp(now
+ 1_000_000, 100), true, false, null, groupId));
- checksSerialization(newLease(null, timestamp(1, 1), timestamp(2 +
1_000_000, 100), true, true, groupId));
+ checksSerialization(newLease(null, timestamp(1, 1), timestamp(2 +
1_000_000, 100), true, true, null, groupId));
- checksSerialization(newLease("node" + new String(new byte[1000]),
timestamp(1, 1), timestamp(2, 100), false, false, groupId));
+ checksSerialization(newLease("node" + new String(new byte[1000]),
timestamp(1, 1), timestamp(2, 100), false, false, null, groupId));
}
@Test
@@ -57,7 +57,15 @@ public class LeaseSerializationTest {
ReplicationGroupId groupId = new TablePartitionId(1, 1);
for (int i = 0; i < 25; i++) {
- leases.add(newLease("node" + i, timestamp(1, i), timestamp(1, i +
1), i % 2 == 0, i % 2 == 1, groupId));
+ leases.add(newLease(
+ "node" + i,
+ timestamp(1, i),
+ timestamp(1, i + 1),
+ i % 2 == 0,
+ i % 2 == 1,
+ i % 2 == 0 ? null : "node" + i,
+ groupId
+ ));
}
byte[] leaseBatchBytes = new LeaseBatch(leases).bytes();
@@ -75,6 +83,7 @@ public class LeaseSerializationTest {
HybridTimestamp expirationTime,
boolean prolong,
boolean accepted,
+ @Nullable String proposedCandidate,
ReplicationGroupId replicationGroupId
) {
return new Lease(
@@ -84,6 +93,7 @@ public class LeaseSerializationTest {
expirationTime,
prolong,
accepted,
+ proposedCandidate,
replicationGroupId
);
}
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index fddbd55a5d..bc86533795 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -31,6 +32,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.Closeable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -44,6 +46,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -75,6 +78,7 @@ import
org.apache.ignite.internal.raft.server.RaftGroupOptions;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.replicator.message.ReplicaMessageTestGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import
org.apache.ignite.internal.replicator.message.TestReplicaMessagesFactory;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -129,6 +133,8 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
/** List of services to have to close before the test will be completed. */
private final List<Closeable> servicesToClose = new ArrayList<>();
+ private BiFunction<ReplicaRequest, String,
CompletableFuture<ReplicaResult>> replicaListener = null;
+
@BeforeEach
public void beforeTest(TestInfo testInfo) {
partitionOperationsExecutor = new ThreadPoolExecutor(
@@ -212,6 +218,8 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
@AfterEach
public void afterTest() throws Exception {
IgniteUtils.closeAll(servicesToClose);
+
+ replicaListener = null;
}
/**
@@ -263,7 +271,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
}
@Test
- public void testNotificationToPlacementDriverAboutChangeLeader() throws
Exception {
+ public void testNotificationToPlacementDriverAboutConnectivityProblem()
throws Exception {
Set<String> grpNodes = chooseRandomNodes(3);
log.info("Replication group is based on {}", grpNodes);
@@ -276,24 +284,20 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
var leaderNodeName = raftClient.leader().consistentId();
- var newLeaderNodeName = grpNodes.stream().filter(n ->
!n.equals(leaderNodeName)).findAny().get();
-
- log.info("Leader is moving form {} to {}", leaderNodeName,
newLeaderNodeName);
-
ConcurrentHashMap<String, String> nodesToReceivedDeclineMsg = new
ConcurrentHashMap<>();
denyLeaseHandler = (msg, from, to) -> {
nodesToReceivedDeclineMsg.put(to, from);
};
- raftClient.transferLeadership(new Peer(newLeaderNodeName)).get();
-
var anyNode = randomNode(Set.of());
log.info("Message sent from {} to {}", anyNode, leaderNodeName);
var clusterService = clusterServices.get(anyNode);
+ replicaListener = (request, sender) -> failedFuture(new
IOException("test"));
+
new ReplicaService(
clusterService.messagingService(),
clock,
@@ -483,7 +487,13 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
log.info("Handle request [type={}]",
request.getClass().getSimpleName());
return
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().build())
- .thenApply(ignored -> new
ReplicaResult(null, null));
+ .thenCompose(ignored -> {
+ if (replicaListener == null) {
+ return completedFuture(new
ReplicaResult(null, null));
+ } else {
+ return
replicaListener.apply(request, senderId);
+ }
+ });
},
raftClient,
new
PendingComparableValuesTracker<>(Long.MAX_VALUE));
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 411cc501ac..182e00537b 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -263,9 +263,9 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return true;
}
- // It's something else: either a JRE thread or an Ignite thread
not marked with ThreadAttributes. In any case,
- // let it proceed. If it touches a storage, we'll see an assertion.
- return false;
+ // It's something else: either a JRE thread or an Ignite thread
not marked with ThreadAttributes. As we are not sure,
+ // let's switch: false negative can produce assertion errors.
+ return true;
}
}
@@ -351,14 +351,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
clusterNetSvc.messagingService().respond(senderConsistentId,
msg, correlationId);
- if (request instanceof PrimaryReplicaRequest) {
- ClusterNode localNode =
clusterNetSvc.topologyService().localMember();
-
- if (!localNode.name().equals(replica.proposedPrimary())) {
- stopLeaseProlongation(request.groupId(),
replica.proposedPrimary());
- } else if (isConnectivityRelatedException(ex)) {
- stopLeaseProlongation(request.groupId(), null);
- }
+ if (request instanceof PrimaryReplicaRequest &&
isConnectivityRelatedException(ex)) {
+ stopLeaseProlongation(request.groupId(), null);
}
if (ex == null && res.replicationFuture() != null) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 429af61f43..119a1ab3d1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.table;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
-import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -146,7 +146,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
// Drop all finish messages to the old primary, pick a new one.
// The coordinator will get a response from the new primary.
- CompletableFuture<Void> transferPrimaryFuture =
changePrimaryOnFinish(context.coordinatorNode, context.publicTable);
+ CompletableFuture<Void> transferPrimaryFuture =
changePrimaryOnFinish(context.coordinatorNode);
// The primary is changed after calculating the outcome and commit
timestamp.
// The new primary successfully commits such transaction.
@@ -155,8 +155,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
assertThat(transferPrimaryFuture, willCompleteSuccessfully());
}
- private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl
coordinatorNode, Table publicTable) {
- TableImpl tableImpl = unwrapTableImpl(publicTable);
+ private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl
coordinatorNode) {
DefaultMessagingService coordinatorMessaging =
messaging(coordinatorNode);
AtomicBoolean dropMessage = new AtomicBoolean(true);
@@ -186,7 +185,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
logger().info("Start transferring primary.");
- NodeUtils.transferPrimary(tableImpl, null, this::node);
+
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()),
defaultTablePartitionId(node(0)), null);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
@@ -248,15 +247,14 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
Context context = prepareTransactionData();
// The transaction is committed but the primary expires right before
applying the cleanup message.
- CompletableFuture<Void> transferPrimaryFuture =
changePrimaryOnCleanup(context.primaryNode, context.publicTable);
+ CompletableFuture<Void> transferPrimaryFuture =
changePrimaryOnCleanup(context.primaryNode);
commitAndValidate(context.tx, context.publicTable, context.keyTpl);
assertThat(transferPrimaryFuture, willCompleteSuccessfully());
}
- private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl
primaryNode, Table publicTable) {
- TableImpl tableImpl = unwrapTableImpl(publicTable);
+ private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl
primaryNode) {
DefaultMessagingService primaryMessaging = messaging(primaryNode);
AtomicBoolean dropMessage = new AtomicBoolean(true);
@@ -286,7 +284,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
logger().info("Start transferring primary.");
- NodeUtils.transferPrimary(tableImpl, null, this::node);
+
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()),
defaultTablePartitionId(node(0)), null);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
index 4494563f3a..6dfb2ff26f 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
@@ -20,25 +20,18 @@ package org.apache.ignite.internal.table;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.IntFunction;
-import java.util.stream.IntStream;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import
org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.jetbrains.annotations.Nullable;
/**
@@ -54,117 +47,67 @@ public class NodeUtils {
/**
* Transfers the primary rights to another node.
*
- * @param tbl Table.
+ * @param nodes Nodes collection.
+ * @param groupId Group id.
* @param preferablePrimary Primary replica name which is preferred for
being primary or {@code null}.
- * @return Future which points to a new primary replica name.
+ * @return New primary replica name.
* @throws InterruptedException If failed.
*/
- // TODO: IGNITE-20365: Replace this method when proper primary change
method is implemented.
- public static String transferPrimary(TableViewInternal tbl, @Nullable
String preferablePrimary, IntFunction<IgniteImpl> nodes)
- throws InterruptedException {
- var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
-
- CompletableFuture<ReplicaMeta> primaryReplicaFut =
nodes.apply(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- nodes.apply(0).clock().now(),
- AWAIT_PRIMARY_REPLICA_TIMEOUT,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- String primary = primaryReplicaFut.join().getLeaseholder();
-
- if (preferablePrimary != null && preferablePrimary.equals(primary)) {
- return primary;
+ public static String transferPrimary(
+ Collection<IgniteImpl> nodes,
+ ReplicationGroupId groupId,
+ @Nullable String preferablePrimary
+ ) throws InterruptedException {
+ LOG.info("Moving the primary replica [preferablePrimary=" +
preferablePrimary + "].");
+
+ IgniteImpl node = nodes.stream().findAny().orElseThrow();
+
+ ReplicaMeta currentLeaseholder = leaseholder(node, groupId);
+
+ IgniteImpl leaseholderNode = nodes.stream()
+ .filter(n ->
n.id().equals(currentLeaseholder.getLeaseholderId()))
+ .findFirst().orElseThrow();
+
+ if (preferablePrimary == null) {
+ preferablePrimary = nodes.stream()
+ .map(IgniteImpl::name)
+ .filter(n -> n.equals(currentLeaseholder.getLeaseholder()))
+ .findFirst()
+ .orElseThrow();
}
- // Change leader for the replication group.
-
- RaftGroupService raftSrvc =
tbl.internalTable().tableRaftService().partitionRaftGroupService(0);
-
- raftSrvc.refreshLeader();
+ String finalPreferablePrimary = preferablePrimary;
- Peer leader = raftSrvc.leader();
+ StopLeaseProlongationMessage msg =
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
+ .groupId(groupId)
+ .redirectProposal(preferablePrimary)
+ .build();
- Peer newLeader = null;
-
- if (preferablePrimary != null) {
- for (Peer peer : raftSrvc.peers()) {
- if (peer.consistentId().equals(preferablePrimary)) {
- newLeader = peer;
- }
- }
- }
-
- if (newLeader == null) {
- for (Peer peer : raftSrvc.peers()) {
- if (!leader.equals(peer)) {
- newLeader = peer;
- }
- }
- }
+ nodes.forEach(
+ n ->
leaseholderNode.clusterService().messagingService().send(n.clusterService().topologyService().localMember(),
msg)
+ );
- assertNotNull(newLeader);
+ assertTrue(waitForCondition(() -> {
+ ReplicaMeta newPrimaryReplica = leaseholder(node, groupId);
- assertThat(raftSrvc.transferLeadership(newLeader),
willCompleteSuccessfully());
+ return
newPrimaryReplica.getLeaseholder().equals(finalPreferablePrimary);
+ }, 10_000));
- LOG.info("Leader moved [from={}, to={}]", leader, newLeader);
+ LOG.info("Primary replica moved successfully.");
- // Leader changed.
+ return finalPreferablePrimary;
+ }
- AtomicReference<String> newLeaseholder = new AtomicReference<>();
+ private static ReplicaMeta leaseholder(IgniteImpl node, ReplicationGroupId
groupId) {
+ CompletableFuture<ReplicaMeta> leaseholderFuture =
node.placementDriver().awaitPrimaryReplica(
+ groupId,
+ node.clock().now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
- AtomicLong lastInsertAttempt = new AtomicLong();
+ assertThat(leaseholderFuture, willCompleteSuccessfully());
- assertTrue(waitForCondition(() -> {
- CompletableFuture<ReplicaMeta> newPrimaryReplicaFut =
nodes.apply(0).placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- nodes.apply(0).clock().now(),
- AWAIT_PRIMARY_REPLICA_TIMEOUT,
- SECONDS
- );
-
- assertThat(newPrimaryReplicaFut, willCompleteSuccessfully());
-
- if (!primary.equals(newPrimaryReplicaFut.join().getLeaseholder()))
{
-
newLeaseholder.set(newPrimaryReplicaFut.join().getLeaseholder());
-
- return true;
- } else {
- // Insert is needed to notify the placement driver about a
leader for the group was changed.
- try {
- long lastTs = lastInsertAttempt.get();
-
- if (coarseCurrentTimeMillis() - lastTs > 1_000 &&
lastInsertAttempt.compareAndSet(lastTs, coarseCurrentTimeMillis())) {
- int nodeCount = nodes.apply(0).clusterNodes().size();
-
- IgniteImpl primaryNode = IntStream.range(0, nodeCount)
- .mapToObj(i -> nodes.apply(i))
- .filter(n -> n.name().equals(primary))
- .findAny().orElseThrow();
-
- StopLeaseProlongationMessage msg =
PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
- .groupId(tblReplicationGrp)
- .build();
-
- IntStream.range(0, nodeCount)
- .mapToObj(i -> nodes.apply(i))
- .forEach(n ->
primaryNode.clusterService().messagingService().send(
-
n.clusterService().topologyService().localMember(),
- msg
- ));
- }
- } catch (Exception e) {
- LOG.error("Failed to perform insert", e);
- }
-
- return false;
- }
- }, 60_000));
-
- LOG.info("Primary replica moved [from={}, to={}]", primary,
newLeaseholder.get());
-
- return newLeaseholder.get();
+ return leaseholderFuture.join();
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 391616b25e..cabe6f8bec 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -309,7 +309,7 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
RaftGroupService raftGroupService =
cluster.leaderServiceFor(tablePartitionId);
assertThat(
- "Unexpected leadership change",
+ "Unexpected leadership change on group: " + tablePartitionId,
raftGroupService.getServerId().getConsistentId(),
is(cluster.node(expectedLeaderNodeIndex).name())
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
index 29aeab0ec8..cf21fb76eb 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
@@ -162,7 +163,7 @@ public class ItTransactionPrimaryChangeTest extends
ClusterPerTestIntegrationTes
assertThat(fullTxReplicationAttemptFuture,
willCompleteSuccessfully());
// Changing the primary.
- NodeUtils.transferPrimary(tbl, txCrdNode.name(), cluster::node);
+
NodeUtils.transferPrimary(cluster.runningNodes().collect(toList()),
tblReplicationGrp, txCrdNode.name());
// Start a regular transaction that increments the value. It
should see the initially inserted value and its commit should
// succeed.