This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 154c99c28a IGNITE-20430 Got rid of unused set and fixed replica
waiters removal (#2604)
154c99c28a is described below
commit 154c99c28a6166307c89d07fd380eda639b1cdd3
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Oct 19 10:36:36 2023 +0300
IGNITE-20430 Got rid of unused set and fixed replica waiters removal (#2604)
---
.../placementdriver/leases/LeaseTracker.java | 41 +++++----
.../internal/placementdriver/LeaseTrackerTest.java | 102 +++++++++++++++++++++
2 files changed, 123 insertions(+), 20 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index c05c3271ee..0e045e2f58 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -35,11 +35,8 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -47,6 +44,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
@@ -170,13 +168,10 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
LeaseBatch leaseBatch =
LeaseBatch.fromBytes(ByteBuffer.wrap(leasesBytes).order(LITTLE_ENDIAN));
- Set<ReplicationGroupId> actualGroups = new HashSet<>();
-
Map<ReplicationGroupId, Lease> previousLeasesMap =
leases.leaseByGroupId();
for (Lease lease : leaseBatch.leases()) {
ReplicationGroupId grpId = lease.replicationGroupId();
- actualGroups.add(grpId);
leasesMap.put(grpId, lease);
@@ -190,15 +185,14 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
}
}
- firePrimaryReplicaExpiredEventIfNeed(event.revision(),
lease);
+ firePrimaryReplicaExpiredEventIfNeeded(grpId,
event.revision(), lease);
}
- for (Iterator<Map.Entry<ReplicationGroupId, Lease>>
iterator = leasesMap.entrySet().iterator(); iterator.hasNext();) {
- Map.Entry<ReplicationGroupId, Lease> e =
iterator.next();
+ for (ReplicationGroupId grpId :
leases.leaseByGroupId().keySet()) {
+ if (!leasesMap.containsKey(grpId)) {
+ tryRemoveTracker(grpId);
- if (!actualGroups.contains(e.getKey())) {
- iterator.remove();
- tryRemoveTracker(e.getKey());
+ firePrimaryReplicaExpiredEventIfNeeded(grpId,
event.revision(), null);
}
}
@@ -310,20 +304,27 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
/**
* Fires the primary replica expire event if it needs.
*
+ * @param grpId Group id, used for the cases when the {@code lease}
parameter is null. Should be always not null.
* @param causalityToken Causality token.
* @param lease Lease to check on expiration.
*/
- private void firePrimaryReplicaExpiredEventIfNeed(long causalityToken,
Lease lease) {
- ReplicationGroupId grpId = lease.replicationGroupId();
+ private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId
grpId, long causalityToken, @Nullable Lease lease) {
+ assert lease == null || lease.replicationGroupId().equals(grpId)
+ : IgniteStringFormatter.format("Group id mismatch [groupId={},
lease={}]", grpId, lease);
+
Lease currentLease = leases.leaseByGroupId().get(grpId);
- if (currentLease != null && currentLease.isAccepted() &&
!currentLease.getStartTime().equals(lease.getStartTime())) {
- CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId,
fireEvent(
- PRIMARY_REPLICA_EXPIRED,
- new PrimaryReplicaEventParameters(causalityToken, grpId,
currentLease.getLeaseholder())
- ));
+ if (currentLease != null && currentLease.isAccepted()) {
+ boolean sameLease = lease != null &&
currentLease.getStartTime().equals(lease.getStartTime());
- assert prev == null || prev.isDone() : "Previous lease expiration
process has not completed yet [grpId=" + grpId + ']';
+ if (!sameLease) {
+ CompletableFuture<Void> prev =
expirationFutureByGroup.put(grpId, fireEvent(
+ PRIMARY_REPLICA_EXPIRED,
+ new PrimaryReplicaEventParameters(causalityToken,
grpId, currentLease.getLeaseholder())
+ ));
+
+ assert prev == null || prev.isDone() : "Previous lease
expiration process has not completed yet [grpId=" + grpId + ']';
+ }
}
}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
new file mode 100644
index 0000000000..7974b386f9
--- /dev/null
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.placementdriver.leases.LeaseBatch;
+import org.apache.ignite.internal.placementdriver.leases.LeaseTracker;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for lease tracker.
+ */
+public class LeaseTrackerTest extends BaseIgniteAbstractTest {
+ @Test
+ public void testLeaseCleanup() {
+ AtomicReference<WatchListener> listenerRef = new AtomicReference<>();
+ MetaStorageManager msManager = mock(MetaStorageManager.class);
+
+ doAnswer(
+ invocation -> {
+ WatchListener lsnr = invocation.getArgument(1);
+ listenerRef.set(lsnr);
+ return null;
+ }
+ ).when(msManager).registerPrefixWatch(any(), any());
+
+ Entry emptyEntry = EntryImpl.empty(PLACEMENTDRIVER_LEASES_KEY.bytes());
+
+ when(msManager.getLocally(any(), anyLong())).thenAnswer(invocation ->
emptyEntry);
+
+ LeaseTracker leaseTracker = new LeaseTracker(msManager);
+ leaseTracker.startTrack(0L);
+
+ AtomicReference<PrimaryReplicaEventParameters> parametersRef = new
AtomicReference<>();
+ leaseTracker.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, (p,
e) -> {
+ parametersRef.set(p);
+ return completedFuture(false);
+ });
+
+ TablePartitionId partId0 = new TablePartitionId(0, 0);
+ Lease lease0 = new Lease("notAccepted", new HybridTimestamp(1, 0), new
HybridTimestamp(1000, 0), partId0);
+ TablePartitionId partId1 = new TablePartitionId(0, 1);
+ Lease lease1 = new Lease("accepted", new HybridTimestamp(1, 0), new
HybridTimestamp(1000, 0), partId1)
+ .acceptLease(new HybridTimestamp(2000, 0));
+
+ // In entry0, there are leases for partition ids partId0 and partId1.
In entry1, there is only partId0, so partId1 is expired.
+ Entry entry0 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new
LeaseBatch(List.of(lease0, lease1)).bytes(), 0, 0);
+ Entry entry1 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new
LeaseBatch(List.of(lease0)).bytes(), 0, 1);
+ listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry,
entry0)));
+
+ assertNull(parametersRef.get());
+
+ // Check that the absence of accepted lease triggers the event.
+ listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry,
entry1)));
+ assertNotNull(parametersRef.get());
+ assertEquals(partId1, parametersRef.get().groupId());
+
+ // Check that the absence of not accepted lease doesn't trigger the
event.
+ parametersRef.set(null);
+ listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry,
emptyEntry)));
+ assertNull(parametersRef.get());
+ }
+}