This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 154c99c28a IGNITE-20430 Got rid of unused set and fixed replica 
waiters removal (#2604)
154c99c28a is described below

commit 154c99c28a6166307c89d07fd380eda639b1cdd3
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Oct 19 10:36:36 2023 +0300

    IGNITE-20430 Got rid of unused set and fixed replica waiters removal (#2604)
---
 .../placementdriver/leases/LeaseTracker.java       |  41 +++++----
 .../internal/placementdriver/LeaseTrackerTest.java | 102 +++++++++++++++++++++
 2 files changed, 123 insertions(+), 20 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index c05c3271ee..0e045e2f58 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -35,11 +35,8 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +44,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -170,13 +168,10 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
 
                     LeaseBatch leaseBatch = 
LeaseBatch.fromBytes(ByteBuffer.wrap(leasesBytes).order(LITTLE_ENDIAN));
 
-                    Set<ReplicationGroupId> actualGroups = new HashSet<>();
-
                     Map<ReplicationGroupId, Lease> previousLeasesMap = 
leases.leaseByGroupId();
 
                     for (Lease lease : leaseBatch.leases()) {
                         ReplicationGroupId grpId = lease.replicationGroupId();
-                        actualGroups.add(grpId);
 
                         leasesMap.put(grpId, lease);
 
@@ -190,15 +185,14 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
                             }
                         }
 
-                        firePrimaryReplicaExpiredEventIfNeed(event.revision(), 
lease);
+                        firePrimaryReplicaExpiredEventIfNeeded(grpId, 
event.revision(), lease);
                     }
 
-                    for (Iterator<Map.Entry<ReplicationGroupId, Lease>> 
iterator = leasesMap.entrySet().iterator(); iterator.hasNext();) {
-                        Map.Entry<ReplicationGroupId, Lease> e = 
iterator.next();
+                    for (ReplicationGroupId grpId : 
leases.leaseByGroupId().keySet()) {
+                        if (!leasesMap.containsKey(grpId)) {
+                            tryRemoveTracker(grpId);
 
-                        if (!actualGroups.contains(e.getKey())) {
-                            iterator.remove();
-                            tryRemoveTracker(e.getKey());
+                            firePrimaryReplicaExpiredEventIfNeeded(grpId, 
event.revision(), null);
                         }
                     }
 
@@ -310,20 +304,27 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
     /**
      * Fires the primary replica expire event if it needs.
      *
+     * @param grpId Group id, used for the cases when the {@code lease} 
parameter is null. Should be always not null.
      * @param causalityToken Causality token.
      * @param lease Lease to check on expiration.
      */
-    private void firePrimaryReplicaExpiredEventIfNeed(long causalityToken, 
Lease lease) {
-        ReplicationGroupId grpId = lease.replicationGroupId();
+    private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId 
grpId, long causalityToken, @Nullable Lease lease) {
+        assert lease == null || lease.replicationGroupId().equals(grpId)
+                : IgniteStringFormatter.format("Group id mismatch [groupId={}, 
lease={}]", grpId, lease);
+
         Lease currentLease = leases.leaseByGroupId().get(grpId);
 
-        if (currentLease != null && currentLease.isAccepted() && 
!currentLease.getStartTime().equals(lease.getStartTime())) {
-            CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, 
fireEvent(
-                    PRIMARY_REPLICA_EXPIRED,
-                    new PrimaryReplicaEventParameters(causalityToken, grpId, 
currentLease.getLeaseholder())
-            ));
+        if (currentLease != null && currentLease.isAccepted()) {
+            boolean sameLease = lease != null && 
currentLease.getStartTime().equals(lease.getStartTime());
 
-            assert prev == null || prev.isDone() : "Previous lease expiration 
process has not completed yet [grpId=" + grpId + ']';
+            if (!sameLease) {
+                CompletableFuture<Void> prev = 
expirationFutureByGroup.put(grpId, fireEvent(
+                        PRIMARY_REPLICA_EXPIRED,
+                        new PrimaryReplicaEventParameters(causalityToken, 
grpId, currentLease.getLeaseholder())
+                ));
+
+                assert prev == null || prev.isDone() : "Previous lease 
expiration process has not completed yet [grpId=" + grpId + ']';
+            }
         }
     }
 
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
new file mode 100644
index 0000000000..7974b386f9
--- /dev/null
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.leases.LeaseBatch;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for lease tracker.
+ */
+public class LeaseTrackerTest extends BaseIgniteAbstractTest {
+    @Test
+    public void testLeaseCleanup() {
+        AtomicReference<WatchListener> listenerRef = new AtomicReference<>();
+        MetaStorageManager msManager = mock(MetaStorageManager.class);
+
+        doAnswer(
+                invocation -> {
+                    WatchListener lsnr = invocation.getArgument(1);
+                    listenerRef.set(lsnr);
+                    return null;
+                }
+        ).when(msManager).registerPrefixWatch(any(), any());
+
+        Entry emptyEntry = EntryImpl.empty(PLACEMENTDRIVER_LEASES_KEY.bytes());
+
+        when(msManager.getLocally(any(), anyLong())).thenAnswer(invocation -> 
emptyEntry);
+
+        LeaseTracker leaseTracker = new LeaseTracker(msManager);
+        leaseTracker.startTrack(0L);
+
+        AtomicReference<PrimaryReplicaEventParameters> parametersRef = new 
AtomicReference<>();
+        leaseTracker.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, (p, 
e) -> {
+            parametersRef.set(p);
+            return completedFuture(false);
+        });
+
+        TablePartitionId partId0 = new TablePartitionId(0, 0);
+        Lease lease0 = new Lease("notAccepted", new HybridTimestamp(1, 0), new 
HybridTimestamp(1000, 0), partId0);
+        TablePartitionId partId1 = new TablePartitionId(0, 1);
+        Lease lease1 = new Lease("accepted", new HybridTimestamp(1, 0), new 
HybridTimestamp(1000, 0), partId1)
+                .acceptLease(new HybridTimestamp(2000, 0));
+
+        // In entry0, there are leases for partition ids partId0 and partId1. 
In entry1, there is only partId0, so partId1 is expired.
+        Entry entry0 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new 
LeaseBatch(List.of(lease0, lease1)).bytes(), 0, 0);
+        Entry entry1 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new 
LeaseBatch(List.of(lease0)).bytes(), 0, 1);
+        listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, 
entry0)));
+
+        assertNull(parametersRef.get());
+
+        // Check that the absence of accepted lease triggers the event.
+        listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, 
entry1)));
+        assertNotNull(parametersRef.get());
+        assertEquals(partId1, parametersRef.get().groupId());
+
+        // Check that the absence of not accepted lease doesn't trigger the 
event.
+        parametersRef.set(null);
+        listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, 
emptyEntry)));
+        assertNull(parametersRef.get());
+    }
+}

Reply via email to