This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fa6e079d7f IGNITE-23246 Fixed leaking LeaseAgreements (#5779)
2fa6e079d7f is described below

commit 2fa6e079d7f27dbbaf8ad7c5566126ce02bf8be2
Author: Denis Chudov <[email protected]>
AuthorDate: Fri May 9 00:09:17 2025 +0300

    IGNITE-23246 Fixed leaking LeaseAgreements (#5779)
---
 .../internal/placementdriver/LeaseUpdater.java     |  5 ++
 .../placementdriver/PlacementDriverManager.java    |  8 ---
 .../negotiation/LeaseNegotiator.java               |  2 +
 .../placementdriver/LeaseNegotiationTest.java      | 60 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 10 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index dd14fb37a3e..e511799e7e4 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -199,6 +199,8 @@ public class LeaseUpdater {
                 return;
             }
 
+            LOG.info("Placement driver active actor is starting.");
+
             leaseNegotiator = new LeaseNegotiator(clusterService);
 
             updaterThread = new IgniteThread(nodeName, "lease-updater", 
updater);
@@ -222,6 +224,8 @@ public class LeaseUpdater {
                 return;
             }
 
+            LOG.info("Placement driver active actor is stopping.");
+
             leaseNegotiator = null;
 
             updaterThread.interrupt();
@@ -574,6 +578,7 @@ public class LeaseUpdater {
                 if (clockService.before(lease.getExpirationTime(), currentTime)
                         && 
!groupsAmongCurrentStableAndPendingAssignments.contains(groupId)) {
                     iter.remove();
+                    leaseNegotiator.cancelAgreement(groupId);
                 } else if (prolongableLeaseGroupIds.contains(groupId)) {
                     entry.setValue(prolongLease(lease, 
newExpirationTimestamp));
                 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 12556b1de53..7218e66869d 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -36,8 +36,6 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -64,8 +62,6 @@ import org.jetbrains.annotations.TestOnly;
  * The another role of the manager is providing a node, which is leaseholder 
at the moment, for a particular replication group.
  */
 public class PlacementDriverManager implements IgniteComponent {
-    private static final IgniteLogger LOG = 
Loggers.forClass(PlacementDriverManager.class);
-
     private static final String PLACEMENTDRIVER_LEASES_KEY_STRING = 
"placementdriver.leases";
 
     public static final ByteArray PLACEMENTDRIVER_LEASES_KEY = 
ByteArray.fromString(PLACEMENTDRIVER_LEASES_KEY_STRING);
@@ -256,15 +252,11 @@ public class PlacementDriverManager implements 
IgniteComponent {
 
     /** Takes over active actor of placement driver group. */
     private void takeOverActiveActorBusy() {
-        LOG.info("Placement driver active actor is starting.");
-
         leaseUpdater.activate();
     }
 
     /** Steps down as active actor. */
     private void stepDownActiveActorBusy() {
-        LOG.info("Placement driver active actor is stopping.");
-
         leaseUpdater.deactivate();
     }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index 6577c1b439f..c11d3e57510 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -90,6 +90,8 @@ public class LeaseNegotiator {
                             LOG.warn("Lease was not negotiated due to 
exception [lease={}]", throwable, lease);
                         }
 
+                        leaseToNegotiate.remove(agreement.groupId(), 
agreement);
+
                         agreement.cancel();
                     }
                 });
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
index 79934746ae3..3380011178a 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java
@@ -37,12 +37,15 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
@@ -73,6 +76,7 @@ import 
org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
 import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
 import 
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
 import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -121,7 +125,7 @@ public class LeaseNegotiationTest extends 
BaseIgniteAbstractTest {
 
     private final long assignmentsTimestamp = new HybridTimestamp(0, 
1).longValue();
 
-    @InjectConfiguration
+    @InjectConfiguration("mock.leaseAgreementAcceptanceTimeLimitMillis = 2000")
     private ReplicationConfiguration replicationConfiguration;
 
     private PartitionGroupId replicationGroupId(int objectId, int partId) {
@@ -166,12 +170,14 @@ public class LeaseNegotiationTest extends 
BaseIgniteAbstractTest {
         pdMessagingService = mock(MessagingService.class);
         when(pdMessagingService.invoke(anyString(), any(), 
anyLong())).thenAnswer(inv -> {
             String nodeId = inv.getArgument(0);
+            long timeout = inv.getArgument(2);
 
             LeaseGrantedMessage leaseGrantedMessage = inv.getArgument(1);
 
             if (leaseGrantedMessageHandler != null) {
                 return CompletableFuture.supplyAsync(() -> null)
-                        .thenCompose(unused -> 
leaseGrantedMessageHandler.apply(nodeId, leaseGrantedMessage));
+                        .thenCompose(unused -> 
leaseGrantedMessageHandler.apply(nodeId, leaseGrantedMessage))
+                        .orTimeout(timeout, TimeUnit.MILLISECONDS);
             } else {
                 return 
completedFuture(createLeaseGrantedMessageResponse(true));
             }
@@ -414,6 +420,50 @@ public class LeaseNegotiationTest extends 
BaseIgniteAbstractTest {
         assertTrue(waitForCondition(() -> getAllLeasesFromMs().isEmpty(), 
20_000));
     }
 
+    @Test
+    public void testLeaseAgreementCleanup() throws Exception {
+        CompletableFuture<?> timedOutGroupLgmReceived = new 
CompletableFuture<>();
+        CompletableFuture<?> removedGroupLgmReceived = new 
CompletableFuture<>();
+
+        PartitionGroupId timedOutGroup = replicationGroupId(1, 1);
+        PartitionGroupId removedGroup = replicationGroupId(1, 2);
+        byte[] assignmentBytes = 
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME)), assignmentsTimestamp);
+
+        leaseGrantedMessageHandler = (n, lgm) -> {
+            if (lgm.groupId().equals(groupId)) {
+                return 
completedFuture(createLeaseGrantedMessageResponse(true));
+            } else if (lgm.groupId().equals(timedOutGroup)) {
+                timedOutGroupLgmReceived.complete(null);
+
+                // Return a future that will never be completed, to trigger 
the agreement timeout.
+                return new CompletableFuture<>();
+            } else {
+                removedGroupLgmReceived.complete(null);
+                return new CompletableFuture<>();
+            }
+        };
+
+        metaStorageManager.put(stableAssignmentsKey(groupId), assignmentBytes);
+        metaStorageManager.put(stableAssignmentsKey(timedOutGroup), 
assignmentBytes);
+        metaStorageManager.put(stableAssignmentsKey(removedGroup), 
assignmentBytes);
+
+        // Wait for accepted lease for groupId.
+        assertTrue(waitForCondition(
+                () -> getAllLeasesFromMs().stream().anyMatch(l -> 
l.replicationGroupId().equals(groupId) && l.isAccepted()),
+                5000
+        ));
+
+        assertThat(timedOutGroupLgmReceived, willSucceedFast());
+        assertThat(removedGroupLgmReceived, willSucceedFast());
+
+        metaStorageManager.remove(stableAssignmentsKey(removedGroup));
+
+        LeaseNegotiator leaseNegotiator = getFieldValue(leaseUpdater, 
"leaseNegotiator");
+        Map agreementsMap = getFieldValue(leaseNegotiator, "leaseToNegotiate");
+
+        assertTrue(waitForCondition(() -> agreementsMap.isEmpty(), 10_000));
+    }
+
     @Test
     public void testLeasesCleanupOfOneGroupFromMultiple() throws 
InterruptedException {
         leaseGrantedMessageHandler = (n, lgm) -> 
completedFuture(createLeaseGrantedMessageResponse(true));
@@ -472,4 +522,10 @@ public class LeaseNegotiationTest extends 
BaseIgniteAbstractTest {
         assertTrue(lease.isAccepted());
         assertEquals(leaseholderId, lease.getLeaseholderId());
     }
+
+    private <T> T getFieldValue(Object o, String fieldName) throws 
NoSuchFieldException, IllegalAccessException {
+        Field f = o.getClass().getDeclaredField(fieldName);
+        f.setAccessible(true);
+        return (T) f.get(o);
+    }
 }

Reply via email to