This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 514949e722 IGNITE-22555 Fixed assertion in 
ReplicaStateManager.onPrimaryElected (#4055)
514949e722 is described below

commit 514949e722ad9899d9d77d0079a7e93b9a77cca4
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jul 16 19:25:36 2024 +0300

    IGNITE-22555 Fixed assertion in ReplicaStateManager.onPrimaryElected (#4055)
---
 .../internal/placementdriver/LeaseUpdater.java     | 36 ++++++++++++++++++++--
 .../placementdriver/leases/LeaseTracker.java       |  8 ++---
 .../negotiation/LeaseAgreement.java                | 21 +++++++++++--
 .../negotiation/LeaseNegotiator.java               | 29 ++++++++++++-----
 .../ignite/internal/replicator/ReplicaImpl.java    |  5 +--
 .../ignite/internal/replicator/ReplicaManager.java | 16 +++++-----
 .../runner/app/ItReplicaStateManagerTest.java      |  2 --
 7 files changed, 88 insertions(+), 29 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 8d77d1fff4..7b149297f7 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.placementdriver;
 
 import static java.util.Objects.hash;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
@@ -222,7 +223,7 @@ public class LeaseUpdater {
     private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId, 
Lease lease, String redirectProposal) {
         Lease deniedLease = lease.denyLease(redirectProposal);
 
-        leaseNegotiator.onLeaseRemoved(grpId);
+        leaseNegotiator.cancelAgreement(grpId);
 
         Leases leasesCurrent = leaseTracker.leasesCurrent();
 
@@ -371,7 +372,17 @@ public class LeaseUpdater {
                     agreement.checkValid(grpId, 
topologyTracker.currentTopologySnapshot(), assignments);
 
                     if (agreement.isAccepted()) {
-                        publishLease(grpId, lease, renewedLeases);
+                        Lease negotiatedLease = agreement.getLease();
+
+                        // Lease information is taken from lease tracker, 
where it appears on meta storage watch updates, so it can contain
+                        // stale leases, if watch processing was delayed for 
some reason. It is ok: negotiated lease is guaranteed to be
+                        // already written to meta storage before negotiation 
begins, and in this case its start time would be
+                        // greater than lease's.
+                        assert negotiatedLease.getStartTime().longValue() >= 
lease.getStartTime().longValue()
+                                : format("Can't publish the lease that was not 
negotiated [groupId={}, startTime={}, "
+                                    + "agreementLeaseStartTime={}].", grpId, 
lease.getStartTime(), agreement.getLease().getStartTime());
+
+                        publishLease(grpId, negotiatedLease, renewedLeases);
 
                         continue;
                     } else if (agreement.isDeclined()) {
@@ -449,11 +460,15 @@ public class LeaseUpdater {
                 if (e != null) {
                     LOG.error("Lease update invocation failed", e);
 
+                    cancelAgreements(toBeNegotiated.keySet());
+
                     return;
                 }
 
                 if (!success) {
-                    LOG.debug("Lease update invocation failed");
+                    LOG.warn("Lease update invocation failed because of 
concurrent update.");
+
+                    cancelAgreements(toBeNegotiated.keySet());
 
                     return;
                 }
@@ -467,6 +482,18 @@ public class LeaseUpdater {
             });
         }
 
+        /**
+         * Cancel all the given agreements. This should be done if the new 
leases that were to be negotiated had been not written to meta
+         * storage.
+         *
+         * @param groupIds Group ids.
+         */
+        private void cancelAgreements(Collection<ReplicationGroupId> groupIds) 
{
+            for (ReplicationGroupId groupId : groupIds) {
+                leaseNegotiator.cancelAgreement(groupId);
+            }
+        }
+
         /**
          * Writes a new lease.
          *
@@ -487,6 +514,9 @@ public class LeaseUpdater {
 
             renewedLeases.put(grpId, renewedLease);
 
+            // Lease agreement should be created synchronously before 
negotiation begins.
+            leaseNegotiator.createAgreement(grpId, renewedLease);
+
             leaseUpdateStatistics.onLeaseCreate();
         }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 61e59f056a..ea1be1d3df 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -196,7 +196,7 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
                                     .update(lease.getExpirationTime(), lease);
 
                             if 
(needFireEventReplicaBecomePrimary(previousLeasesMap.get(grpId), lease)) {
-                                
fireEventFutures.add(fireEventReplicaBecomePrimary(event.revision(), lease));
+                                
fireEventFutures.add(fireEventPrimaryReplicaElected(event.revision(), lease));
                             }
                         }
 
@@ -218,7 +218,7 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
                     leases = new Leases(unmodifiableMap(leasesMap), 
leasesBytes);
 
                     for (Lease expiredLease : expiredLeases) {
-                        firePrimaryReplicaExpiredEvent(event.revision(), 
expiredLease);
+                        fireEventPrimaryReplicaExpired(event.revision(), 
expiredLease);
                     }
                 }
 
@@ -380,7 +380,7 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
      * @param causalityToken Causality token.
      * @param expiredLease Expired lease.
      */
-    private void firePrimaryReplicaExpiredEvent(long causalityToken, Lease 
expiredLease) {
+    private void fireEventPrimaryReplicaExpired(long causalityToken, Lease 
expiredLease) {
         ReplicationGroupId grpId = expiredLease.replicationGroupId();
 
         CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, 
fireEvent(
@@ -397,7 +397,7 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
         assert prev == null || prev.isDone() : "Previous lease expiration 
process has not completed yet [grpId=" + grpId + ']';
     }
 
-    private CompletableFuture<Void> fireEventReplicaBecomePrimary(long 
causalityToken, Lease lease) {
+    private CompletableFuture<Void> fireEventPrimaryReplicaElected(long 
causalityToken, Lease lease) {
         String leaseholderId = lease.getLeaseholderId();
 
         assert leaseholderId != null : lease;
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
index fa1fce72e9..f316a80463 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java
@@ -45,7 +45,7 @@ public class LeaseAgreement {
      * The agreement, which has not try negotiating yet. We assume that it is 
{@link #ready()} and not {@link #isAccepted()}
      * which allows both initiation and retries of negotiation.
      */
-    public static final LeaseAgreement UNDEFINED_AGREEMENT = new 
LeaseAgreement(null, nullCompletedFuture());
+    static final LeaseAgreement UNDEFINED_AGREEMENT = new LeaseAgreement(null, 
nullCompletedFuture());
 
     /** Lease. */
     private final Lease lease;
@@ -57,9 +57,18 @@ public class LeaseAgreement {
      * The constructor.
      *
      * @param lease Lease.
+     */
+    public LeaseAgreement(Lease lease) {
+        this(lease, new CompletableFuture<>());
+    }
+
+    /**
+     * The constructor for private use.
+     *
+     * @param lease Lease.
      * @param remoteNodeResponseFuture The future of response from the remote 
node which is negotiating the agreement.
      */
-    public LeaseAgreement(Lease lease, 
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
+    private LeaseAgreement(Lease lease, 
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
         this.lease = lease;
         this.responseFut = requireNonNull(remoteNodeResponseFuture);
     }
@@ -158,4 +167,12 @@ public class LeaseAgreement {
             }
         }
     }
+
+    void onResponse(LeaseGrantedMessageResponse response) {
+        responseFut.complete(response);
+    }
+
+    void cancel() {
+        responseFut.complete(null);
+    }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index 2e9f2160bb..fd62dfe082 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreem
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -66,11 +65,11 @@ public class LeaseNegotiator {
      * @param force If the flag is true, the process tries to insist of apply 
the lease.
      */
     public void negotiate(Lease lease, boolean force) {
-        var fut = new CompletableFuture<LeaseGrantedMessageResponse>();
-
         ReplicationGroupId groupId = lease.replicationGroupId();
 
-        leaseToNegotiate.put(groupId, new LeaseAgreement(lease, fut));
+        LeaseAgreement agreement = leaseToNegotiate.get(groupId);
+
+        assert agreement != null : "Lease agreement should exist when 
negotiation begins [groupId=" + groupId + "].";
 
         long leaseInterval = lease.getExpirationTime().getPhysical() - 
lease.getStartTime().getPhysical();
 
@@ -90,13 +89,13 @@ public class LeaseNegotiator {
 
                         LeaseGrantedMessageResponse response = 
(LeaseGrantedMessageResponse) msg;
 
-                        fut.complete(response);
+                        agreement.onResponse(response);
                     } else {
                         if (!(unwrapCause(throwable) instanceof 
NodeStoppingException)) {
                             LOG.warn("Lease was not negotiated due to 
exception [lease={}]", throwable, lease);
                         }
 
-                        fut.complete(null);
+                        agreement.cancel();
                     }
                 });
     }
@@ -120,12 +119,26 @@ public class LeaseNegotiator {
         return res[0] == null ? UNDEFINED_AGREEMENT : res[0];
     }
 
+    /**
+     * Creates an agreement.
+     *
+     * @param groupId Group id.
+     * @param lease Lease to negotiate.
+     */
+    public void createAgreement(ReplicationGroupId groupId, Lease lease) {
+        leaseToNegotiate.put(groupId, new LeaseAgreement(lease));
+    }
+
     /**
      * Removes lease from list to negotiate.
      *
      * @param groupId Lease to expire.
      */
-    public void onLeaseRemoved(ReplicationGroupId groupId) {
-        leaseToNegotiate.remove(groupId);
+    public void cancelAgreement(ReplicationGroupId groupId) {
+        LeaseAgreement agreement = leaseToNegotiate.remove(groupId);
+
+        if (agreement != null) {
+            agreement.cancel();
+        }
     }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
index 41de204324..86dc454979 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
@@ -196,7 +196,8 @@ public class ReplicaImpl implements Replica {
      * @return Future that contains a result.
      */
     private CompletableFuture<LeaseGrantedMessageResponse> 
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
-        LOG.info("Received LeaseGrantedMessage for replica belonging to 
group=" + groupId() + ", force=" + msg.force());
+        LOG.info("Received LeaseGrantedMessage for replica [groupId={}, 
leaseStartTime={}, force={}].", groupId(), msg.leaseStartTime(),
+                msg.force());
 
         return 
placementDriver.previousPrimaryExpired(groupId()).thenCompose(unused -> 
leaderFuture().thenCompose(leader -> {
             HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
@@ -282,7 +283,7 @@ public class ReplicaImpl implements Replica {
         LOG.info("Waiting for actual storage state, group=" + groupId());
 
         if (!replicaReservationClosure.apply(groupId(), startTime)) {
-            throw new IllegalStateException("Replica reservation failed 
[groupId=" + groupId() + "].");
+            throw new IllegalStateException("Replica reservation failed 
[groupId=" + groupId() + ", leaseStartTime=" + startTime + "].");
         }
 
         long timeout = expirationTime - currentTimeMillis();
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 90d99aac95..57585d8c55 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -1269,11 +1269,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             synchronized (context) {
                 if (localNodeId.equals(parameters.leaseholderId())) {
                     assert context.replicaState != ReplicaState.STOPPED : 
"Unexpected primary replica state STOPPED [groupId="
-                            + groupId + "].";
-
-                    context.assertReservation(groupId);
+                            + groupId + ", leaseStartTime=" + 
parameters.startTime() + "].";
                 } else if (context.reservedForPrimary) {
-                    context.assertReservation(groupId);
+                    context.assertReservation(groupId, parameters.startTime());
 
                     // Unreserve if another replica was elected as primary, 
only if its lease start time is greater,
                     // otherwise it means that event is too late relatively to 
lease negotiation start and should be ignored.
@@ -1296,7 +1294,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
                 if (context != null) {
                     synchronized (context) {
-                        context.assertReservation(parameters.groupId());
+                        context.assertReservation(parameters.groupId(), 
parameters.startTime());
                         // Unreserve if primary replica expired, only if its 
lease start time is greater,
                         // otherwise it means that event is too late 
relatively to lease negotiation start and should be ignored.
                         if 
(parameters.startTime().equals(context.leaseStartTime)) {
@@ -1552,9 +1550,11 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             leaseStartTime = null;
         }
 
-        void assertReservation(ReplicationGroupId groupId) {
-            assert reservedForPrimary : "Replica is elected as primary but not 
reserved [groupId=" + groupId + "].";
-            assert leaseStartTime != null : "Replica is reserved but lease 
start time is null [groupId=" + groupId + "].";
+        void assertReservation(ReplicationGroupId groupId, HybridTimestamp 
leaseStartTime) {
+            assert reservedForPrimary : "Replica is elected as primary but not 
reserved [groupId="
+                    + groupId + ", leaseStartTime=" + leaseStartTime + "].";
+            assert leaseStartTime != null : "Replica is reserved but lease 
start time is null [groupId="
+                    + groupId + ", leaseStartTime=" + leaseStartTime + "].";
         }
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
index 7b90f79329..ccb32f4afc 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.TableImpl;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -65,7 +64,6 @@ public class ItReplicaStateManagerTest extends 
BaseIgniteRestartTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22629";)
     public void testReplicaStatesManagement() throws InterruptedException {
         int nodesCount = 3;
         List<IgniteImpl> nodes = startNodes(nodesCount);

Reply via email to