This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 514949e722 IGNITE-22555 Fixed assertion in
ReplicaStateManager.onPrimaryElected (#4055)
514949e722 is described below
commit 514949e722ad9899d9d77d0079a7e93b9a77cca4
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jul 16 19:25:36 2024 +0300
IGNITE-22555 Fixed assertion in ReplicaStateManager.onPrimaryElected (#4055)
---
.../internal/placementdriver/LeaseUpdater.java | 36 ++++++++++++++++++++--
.../placementdriver/leases/LeaseTracker.java | 8 ++---
.../negotiation/LeaseAgreement.java | 21 +++++++++++--
.../negotiation/LeaseNegotiator.java | 29 ++++++++++++-----
.../ignite/internal/replicator/ReplicaImpl.java | 5 +--
.../ignite/internal/replicator/ReplicaManager.java | 16 +++++-----
.../runner/app/ItReplicaStateManagerTest.java | 2 --
7 files changed, 88 insertions(+), 29 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 8d77d1fff4..7b149297f7 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.placementdriver;
import static java.util.Objects.hash;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
@@ -222,7 +223,7 @@ public class LeaseUpdater {
private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId,
Lease lease, String redirectProposal) {
Lease deniedLease = lease.denyLease(redirectProposal);
- leaseNegotiator.onLeaseRemoved(grpId);
+ leaseNegotiator.cancelAgreement(grpId);
Leases leasesCurrent = leaseTracker.leasesCurrent();
@@ -371,7 +372,17 @@ public class LeaseUpdater {
agreement.checkValid(grpId,
topologyTracker.currentTopologySnapshot(), assignments);
if (agreement.isAccepted()) {
- publishLease(grpId, lease, renewedLeases);
+ Lease negotiatedLease = agreement.getLease();
+
+ // Lease information is taken from lease tracker,
where it appears on meta storage watch updates, so it can contain
+ // stale leases, if watch processing was delayed for
some reason. It is ok: negotiated lease is guaranteed to be
+ // already written to meta storage before negotiation
begins, and in this case its start time would be
+ // greater than lease's.
+ assert negotiatedLease.getStartTime().longValue() >=
lease.getStartTime().longValue()
+ : format("Can't publish the lease that was not
negotiated [groupId={}, startTime={}, "
+ + "agreementLeaseStartTime={}].", grpId,
lease.getStartTime(), agreement.getLease().getStartTime());
+
+ publishLease(grpId, negotiatedLease, renewedLeases);
continue;
} else if (agreement.isDeclined()) {
@@ -449,11 +460,15 @@ public class LeaseUpdater {
if (e != null) {
LOG.error("Lease update invocation failed", e);
+ cancelAgreements(toBeNegotiated.keySet());
+
return;
}
if (!success) {
- LOG.debug("Lease update invocation failed");
+ LOG.warn("Lease update invocation failed because of
concurrent update.");
+
+ cancelAgreements(toBeNegotiated.keySet());
return;
}
@@ -467,6 +482,18 @@ public class LeaseUpdater {
});
}
+ /**
+ * Cancel all the given agreements. This should be done if the new
leases that were to be negotiated had been not written to meta
+ * storage.
+ *
+ * @param groupIds Group ids.
+ */
+ private void cancelAgreements(Collection<ReplicationGroupId> groupIds)
{
+ for (ReplicationGroupId groupId : groupIds) {
+ leaseNegotiator.cancelAgreement(groupId);
+ }
+ }
+
/**
* Writes a new lease.
*
@@ -487,6 +514,9 @@ public class LeaseUpdater {
renewedLeases.put(grpId, renewedLease);
+ // Lease agreement should be created synchronously before
negotiation begins.
+ leaseNegotiator.createAgreement(grpId, renewedLease);
+
leaseUpdateStatistics.onLeaseCreate();
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 61e59f056a..ea1be1d3df 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -196,7 +196,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
.update(lease.getExpirationTime(), lease);
if
(needFireEventReplicaBecomePrimary(previousLeasesMap.get(grpId), lease)) {
-
fireEventFutures.add(fireEventReplicaBecomePrimary(event.revision(), lease));
+
fireEventFutures.add(fireEventPrimaryReplicaElected(event.revision(), lease));
}
}
@@ -218,7 +218,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
leases = new Leases(unmodifiableMap(leasesMap),
leasesBytes);
for (Lease expiredLease : expiredLeases) {
- firePrimaryReplicaExpiredEvent(event.revision(),
expiredLease);
+ fireEventPrimaryReplicaExpired(event.revision(),
expiredLease);
}
}
@@ -380,7 +380,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
* @param causalityToken Causality token.
* @param expiredLease Expired lease.
*/
- private void firePrimaryReplicaExpiredEvent(long causalityToken, Lease
expiredLease) {
+ private void fireEventPrimaryReplicaExpired(long causalityToken, Lease
expiredLease) {
ReplicationGroupId grpId = expiredLease.replicationGroupId();
CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId,
fireEvent(
@@ -397,7 +397,7 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
assert prev == null || prev.isDone() : "Previous lease expiration
process has not completed yet [grpId=" + grpId + ']';
}
- private CompletableFuture<Void> fireEventReplicaBecomePrimary(long
causalityToken, Lease lease) {
+ private CompletableFuture<Void> fireEventPrimaryReplicaElected(long
causalityToken, Lease lease) {
String leaseholderId = lease.getLeaseholderId();
assert leaseholderId != null : lease;
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
index fa1fce72e9..f316a80463 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -45,7 +45,7 @@ public class LeaseAgreement {
* The agreement, which has not try negotiating yet. We assume that it is
{@link #ready()} and not {@link #isAccepted()}
* which allows both initiation and retries of negotiation.
*/
- public static final LeaseAgreement UNDEFINED_AGREEMENT = new
LeaseAgreement(null, nullCompletedFuture());
+ static final LeaseAgreement UNDEFINED_AGREEMENT = new LeaseAgreement(null,
nullCompletedFuture());
/** Lease. */
private final Lease lease;
@@ -57,9 +57,18 @@ public class LeaseAgreement {
* The constructor.
*
* @param lease Lease.
+ */
+ public LeaseAgreement(Lease lease) {
+ this(lease, new CompletableFuture<>());
+ }
+
+ /**
+ * The constructor for private use.
+ *
+ * @param lease Lease.
* @param remoteNodeResponseFuture The future of response from the remote
node which is negotiating the agreement.
*/
- public LeaseAgreement(Lease lease,
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
+ private LeaseAgreement(Lease lease,
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
this.lease = lease;
this.responseFut = requireNonNull(remoteNodeResponseFuture);
}
@@ -158,4 +167,12 @@ public class LeaseAgreement {
}
}
}
+
+ void onResponse(LeaseGrantedMessageResponse response) {
+ responseFut.complete(response);
+ }
+
+ void cancel() {
+ responseFut.complete(null);
+ }
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index 2e9f2160bb..fd62dfe082 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreem
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -66,11 +65,11 @@ public class LeaseNegotiator {
* @param force If the flag is true, the process tries to insist of apply
the lease.
*/
public void negotiate(Lease lease, boolean force) {
- var fut = new CompletableFuture<LeaseGrantedMessageResponse>();
-
ReplicationGroupId groupId = lease.replicationGroupId();
- leaseToNegotiate.put(groupId, new LeaseAgreement(lease, fut));
+ LeaseAgreement agreement = leaseToNegotiate.get(groupId);
+
+ assert agreement != null : "Lease agreement should exist when
negotiation begins [groupId=" + groupId + "].";
long leaseInterval = lease.getExpirationTime().getPhysical() -
lease.getStartTime().getPhysical();
@@ -90,13 +89,13 @@ public class LeaseNegotiator {
LeaseGrantedMessageResponse response =
(LeaseGrantedMessageResponse) msg;
- fut.complete(response);
+ agreement.onResponse(response);
} else {
if (!(unwrapCause(throwable) instanceof
NodeStoppingException)) {
LOG.warn("Lease was not negotiated due to
exception [lease={}]", throwable, lease);
}
- fut.complete(null);
+ agreement.cancel();
}
});
}
@@ -120,12 +119,26 @@ public class LeaseNegotiator {
return res[0] == null ? UNDEFINED_AGREEMENT : res[0];
}
+ /**
+ * Creates an agreement.
+ *
+ * @param groupId Group id.
+ * @param lease Lease to negotiate.
+ */
+ public void createAgreement(ReplicationGroupId groupId, Lease lease) {
+ leaseToNegotiate.put(groupId, new LeaseAgreement(lease));
+ }
+
/**
* Removes lease from list to negotiate.
*
* @param groupId Lease to expire.
*/
- public void onLeaseRemoved(ReplicationGroupId groupId) {
- leaseToNegotiate.remove(groupId);
+ public void cancelAgreement(ReplicationGroupId groupId) {
+ LeaseAgreement agreement = leaseToNegotiate.remove(groupId);
+
+ if (agreement != null) {
+ agreement.cancel();
+ }
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
index 41de204324..86dc454979 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
@@ -196,7 +196,8 @@ public class ReplicaImpl implements Replica {
* @return Future that contains a result.
*/
private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
- LOG.info("Received LeaseGrantedMessage for replica belonging to
group=" + groupId() + ", force=" + msg.force());
+ LOG.info("Received LeaseGrantedMessage for replica [groupId={},
leaseStartTime={}, force={}].", groupId(), msg.leaseStartTime(),
+ msg.force());
return
placementDriver.previousPrimaryExpired(groupId()).thenCompose(unused ->
leaderFuture().thenCompose(leader -> {
HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
@@ -282,7 +283,7 @@ public class ReplicaImpl implements Replica {
LOG.info("Waiting for actual storage state, group=" + groupId());
if (!replicaReservationClosure.apply(groupId(), startTime)) {
- throw new IllegalStateException("Replica reservation failed
[groupId=" + groupId() + "].");
+ throw new IllegalStateException("Replica reservation failed
[groupId=" + groupId() + ", leaseStartTime=" + startTime + "].");
}
long timeout = expirationTime - currentTimeMillis();
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 90d99aac95..57585d8c55 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -1269,11 +1269,9 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
synchronized (context) {
if (localNodeId.equals(parameters.leaseholderId())) {
assert context.replicaState != ReplicaState.STOPPED :
"Unexpected primary replica state STOPPED [groupId="
- + groupId + "].";
-
- context.assertReservation(groupId);
+ + groupId + ", leaseStartTime=" +
parameters.startTime() + "].";
} else if (context.reservedForPrimary) {
- context.assertReservation(groupId);
+ context.assertReservation(groupId, parameters.startTime());
// Unreserve if another replica was elected as primary,
only if its lease start time is greater,
// otherwise it means that event is too late relatively to
lease negotiation start and should be ignored.
@@ -1296,7 +1294,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
if (context != null) {
synchronized (context) {
- context.assertReservation(parameters.groupId());
+ context.assertReservation(parameters.groupId(),
parameters.startTime());
// Unreserve if primary replica expired, only if its
lease start time is greater,
// otherwise it means that event is too late
relatively to lease negotiation start and should be ignored.
if
(parameters.startTime().equals(context.leaseStartTime)) {
@@ -1552,9 +1550,11 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
leaseStartTime = null;
}
- void assertReservation(ReplicationGroupId groupId) {
- assert reservedForPrimary : "Replica is elected as primary but not
reserved [groupId=" + groupId + "].";
- assert leaseStartTime != null : "Replica is reserved but lease
start time is null [groupId=" + groupId + "].";
+ void assertReservation(ReplicationGroupId groupId, HybridTimestamp
leaseStartTime) {
+ assert reservedForPrimary : "Replica is elected as primary but not
reserved [groupId="
+ + groupId + ", leaseStartTime=" + leaseStartTime + "].";
+ assert leaseStartTime != null : "Replica is reserved but lease
start time is null [groupId="
+ + groupId + ", leaseStartTime=" + leaseStartTime + "].";
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
index 7b90f79329..ccb32f4afc 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -65,7 +64,6 @@ public class ItReplicaStateManagerTest extends
BaseIgniteRestartTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-22629")
public void testReplicaStatesManagement() throws InterruptedException {
int nodesCount = 3;
List<IgniteImpl> nodes = startNodes(nodesCount);