Copilot commented on code in PR #25044:
URL: https://github.com/apache/pulsar/pull/25044#discussion_r2613005399
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java:
##########
@@ -96,14 +106,217 @@ public void testSnapshotCachePruning() {
cache.addNewSnapshot(s3);
cache.addNewSnapshot(s4);
- // Snapshot-1 was already pruned
- assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1,
1)));
- ReplicatedSubscriptionsSnapshot snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
+ ReplicatedSubscriptionSnapshotCache.SnapshotResult
+ snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-2");
+ // Snapshot-2 was already pruned
+ assertEquals(snapshot.position(), PositionFactory.create(1, 1));
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5,
5));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-4");
+ assertEquals(snapshot.position(), PositionFactory.create(4, 4));
}
-}
+
+
+ @Test(timeOut = 15_000)
+ public void testSnapshotCachePruningByKeepingEqualDistance() {
+ int maxSnapshotToCache = 10_000;
+ int addSnapshotCount = 1_000_000;
+
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription",
maxSnapshotToCache,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ long ledgerIdCluster1 = 1;
+ long entryIdCluster1 = 0;
+ long ledgerIdCluster2 = 2;
+ long entryIdCluster2 = 0;
+ Random random = new Random();
+
+ // create a large number of snapshots where the entry ids move forward
100 + 0-1000 (random) entries at a time
+ for (int i = 0; i < addSnapshotCount; i++) {
+ ReplicatedSubscriptionsSnapshot snapshot = new
ReplicatedSubscriptionsSnapshot()
+ .setSnapshotId(UUID.randomUUID().toString());
+
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+ .setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+ .setEntryId(entryIdCluster2);
+ cache.addNewSnapshot(snapshot);
+ entryIdCluster1 += 100 + random.nextInt(1000);
+ entryIdCluster2 += 100 + random.nextInt(1000);
+ }
+
+ // validate the state of snapshots
+ List<ReplicatedSubscriptionSnapshotCache.SnapshotEntry> snapshots =
cache.getSnapshots();
+ assertEquals(snapshots.size(), maxSnapshotToCache);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry second =
snapshots.get(1);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry secondLast =
snapshots.get(snapshots.size() - 2);
+ long distance = secondLast.position().getEntryId() -
second.position().getEntryId();
+ long expectedAverageDistance = distance / snapshots.size();
+
+ long maxDistance = 0;
+ long minDistance = Long.MAX_VALUE;
+ for (int i = 0; i < snapshots.size() - 1; i++) {
+ Position position = snapshots.get(i).position();
+ Position nextPosition = snapshots.get(i + 1).position();
+ long distanceToNext = nextPosition.getEntryId() -
position.getEntryId();
+ if (log.isDebugEnabled()) {
+ log.debug(i + ": " + position + " -> " + nextPosition + "
distance to next: " + distanceToNext
+ + " to previous: " +
snapshots.get(i).distanceToPrevious());
+ }
+ maxDistance = Math.max(maxDistance, distanceToNext);
+ minDistance = Math.min(minDistance, distanceToNext);
+
+ // ensure that each snapshot is within 2 * expected average
distance from the previous one
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry snapshotEntry =
snapshots.get(i);
+ assertThat(snapshotEntry.distanceToPrevious()).describedAs(
+ "distance to previous for snapshot entry: %s is
not expected", snapshotEntry)
+ .isLessThanOrEqualTo(expectedAverageDistance * 2);
+ }
+
+ log.info("Average distance, expected: {}", expectedAverageDistance);
+ log.info("Min distance: {}", minDistance);
+ log.info("Max distance: {}", maxDistance);
+
+ // check that picking a random markDeletePosition within the range of
the second snapshot will result in a
+ // snapshot that is within 2 * expectedAverageDistance from the
markDeletePosition
+ Position markDeletePosition =
+ PositionFactory.create(ledgerIdCluster1,
second.position().getEntryId() + random.nextLong(distance));
Review Comment:
The usage of `random.nextLong(distance)` can throw an
IllegalArgumentException if `distance` is less than or equal to 0. In the
current context, if `second.position().getEntryId()` equals
`secondLast.position().getEntryId()` (unlikely but theoretically possible in
edge cases), the distance would be 0 or negative, causing this method to fail.
Consider adding a guard condition or using `Math.max(1, distance)` to ensure a
positive bound.
```suggestion
PositionFactory.create(ledgerIdCluster1,
second.position().getEntryId() + random.nextLong(Math.max(1, distance)));
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java:
##########
@@ -96,14 +106,217 @@ public void testSnapshotCachePruning() {
cache.addNewSnapshot(s3);
cache.addNewSnapshot(s4);
- // Snapshot-1 was already pruned
- assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1,
1)));
- ReplicatedSubscriptionsSnapshot snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
+ ReplicatedSubscriptionSnapshotCache.SnapshotResult
+ snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-2");
+ // Snapshot-2 was already pruned
+ assertEquals(snapshot.position(), PositionFactory.create(1, 1));
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5,
5));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-4");
+ assertEquals(snapshot.position(), PositionFactory.create(4, 4));
}
-}
+
+
+ @Test(timeOut = 15_000)
+ public void testSnapshotCachePruningByKeepingEqualDistance() {
+ int maxSnapshotToCache = 10_000;
+ int addSnapshotCount = 1_000_000;
+
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription",
maxSnapshotToCache,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ long ledgerIdCluster1 = 1;
+ long entryIdCluster1 = 0;
+ long ledgerIdCluster2 = 2;
+ long entryIdCluster2 = 0;
+ Random random = new Random();
+
+ // create a large number of snapshots where the entry ids move forward
100 + 0-1000 (random) entries at a time
+ for (int i = 0; i < addSnapshotCount; i++) {
+ ReplicatedSubscriptionsSnapshot snapshot = new
ReplicatedSubscriptionsSnapshot()
+ .setSnapshotId(UUID.randomUUID().toString());
+
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+ .setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+ .setEntryId(entryIdCluster2);
+ cache.addNewSnapshot(snapshot);
+ entryIdCluster1 += 100 + random.nextInt(1000);
+ entryIdCluster2 += 100 + random.nextInt(1000);
+ }
+
+ // validate the state of snapshots
+ List<ReplicatedSubscriptionSnapshotCache.SnapshotEntry> snapshots =
cache.getSnapshots();
+ assertEquals(snapshots.size(), maxSnapshotToCache);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry second =
snapshots.get(1);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry secondLast =
snapshots.get(snapshots.size() - 2);
+ long distance = secondLast.position().getEntryId() -
second.position().getEntryId();
+ long expectedAverageDistance = distance / snapshots.size();
+
+ long maxDistance = 0;
+ long minDistance = Long.MAX_VALUE;
+ for (int i = 0; i < snapshots.size() - 1; i++) {
+ Position position = snapshots.get(i).position();
+ Position nextPosition = snapshots.get(i + 1).position();
+ long distanceToNext = nextPosition.getEntryId() -
position.getEntryId();
+ if (log.isDebugEnabled()) {
+ log.debug(i + ": " + position + " -> " + nextPosition + "
distance to next: " + distanceToNext
+ + " to previous: " +
snapshots.get(i).distanceToPrevious());
+ }
+ maxDistance = Math.max(maxDistance, distanceToNext);
+ minDistance = Math.min(minDistance, distanceToNext);
+
+ // ensure that each snapshot is within 2 * expected average
distance from the previous one
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry snapshotEntry =
snapshots.get(i);
+ assertThat(snapshotEntry.distanceToPrevious()).describedAs(
+ "distance to previous for snapshot entry: %s is
not expected", snapshotEntry)
+ .isLessThanOrEqualTo(expectedAverageDistance * 2);
+ }
+
+ log.info("Average distance, expected: {}", expectedAverageDistance);
+ log.info("Min distance: {}", minDistance);
+ log.info("Max distance: {}", maxDistance);
+
+ // check that picking a random markDeletePosition within the range of
the second snapshot will result in a
+ // snapshot that is within 2 * expectedAverageDistance from the
markDeletePosition
+ Position markDeletePosition =
+ PositionFactory.create(ledgerIdCluster1,
second.position().getEntryId() + random.nextLong(distance));
+
+
assertThat(cache.advancedMarkDeletePosition(markDeletePosition)).satisfies(snapshotResult
-> {
+ long snapshotDistance = markDeletePosition.getEntryId() -
snapshotResult.position().getEntryId();
+ assertThat(snapshotDistance).describedAs("snapshot result: %s
markDeletePosition: %s", snapshotResult,
+
markDeletePosition).isLessThanOrEqualTo(expectedAverageDistance * 2);
+ });
+
+ }
+
+ @Test
+ public void testSnapshotCachePruningScenarios() {
+ ReplicatedSubscriptionSnapshotCache cache = new
ReplicatedSubscriptionSnapshotCache("my-subscription", 5,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ ReplicatedSubscriptionsSnapshot s1 = new
ReplicatedSubscriptionsSnapshot();
+ s1.setLocalMessageId().setLedgerId(1).setEntryId(1);
+ cache.addNewSnapshot(s1);
+
+ ReplicatedSubscriptionsSnapshot s2 = new
ReplicatedSubscriptionsSnapshot();
+ s2.setLocalMessageId().setLedgerId(1).setEntryId(2);
+ cache.addNewSnapshot(s2);
+
+ ReplicatedSubscriptionsSnapshot s3 = new
ReplicatedSubscriptionsSnapshot();
+ s3.setLocalMessageId().setLedgerId(1).setEntryId(10);
+ cache.addNewSnapshot(s3);
+
+ ReplicatedSubscriptionsSnapshot s4 = new
ReplicatedSubscriptionsSnapshot();
+ s4.setLocalMessageId().setLedgerId(1).setEntryId(15);
+ cache.addNewSnapshot(s4);
+
+ ReplicatedSubscriptionsSnapshot s5 = new
ReplicatedSubscriptionsSnapshot();
+ s5.setLocalMessageId().setLedgerId(1).setEntryId(25);
+ cache.addNewSnapshot(s5);
+
+ ReplicatedSubscriptionsSnapshot s6 = new
ReplicatedSubscriptionsSnapshot();
+ s6.setLocalMessageId().setLedgerId(1).setEntryId(100);
+ cache.addNewSnapshot(s6);
+
+ // s2 should be pruned (special case where head is previous to the
removed one)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 2)));
+
+ ReplicatedSubscriptionsSnapshot s7 = new
ReplicatedSubscriptionsSnapshot();
+ s7.setLocalMessageId().setLedgerId(1).setEntryId(110);
+ cache.addNewSnapshot(s7);
+
+ // s3 should be pruned (ordinary case where middle entry is removed)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 10)));
+
+ ReplicatedSubscriptionsSnapshot s8 = new
ReplicatedSubscriptionsSnapshot();
+ s8.setLocalMessageId().setLedgerId(1).setEntryId(112);
+ cache.addNewSnapshot(s8);
+
+ // s7 should be pruned (special case where tail is after the removed
one)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 110)));
+
+
+ ReplicatedSubscriptionsSnapshot s9 = new
ReplicatedSubscriptionsSnapshot();
+ s9.setLocalMessageId().setLedgerId(1).setEntryId(113);
+ cache.addNewSnapshot(s9);
+
+ // s8 should be pruned (check that pruning works after the one before
the tail was removed)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 112)));
+
+ ReplicatedSubscriptionsSnapshot s10 = new
ReplicatedSubscriptionsSnapshot();
+ s10.setLocalMessageId().setLedgerId(1).setEntryId(200);
+ cache.addNewSnapshot(s10);
+
+ // s4 should be pruned (check that pruning still works immediately
after head)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 15)));
+
+ ReplicatedSubscriptionsSnapshot s11 = new
ReplicatedSubscriptionsSnapshot();
+ // entry id that is before the tail
+ s11.setLocalMessageId().setLedgerId(1).setEntryId(50);
+ cache.addNewSnapshot(s11);
+
+ // all snapshots should be pruned, and s11 should be the only one
+ assertThat(cache.getSnapshots()).hasSize(1)
+ .first().satisfies(snapshotEntry ->
assertThat(snapshotEntry.position()).isEqualTo(
+ PositionFactory.create(1, 50)));
+ }
+
+ @Test(timeOut = 15_000)
+ public void testSnapshotCacheStressTest() {
+ int maxSnapshotToCache = 10_000;
+ int addSnapshotCount = 1_000_000;
+
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription",
maxSnapshotToCache,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ long ledgerIdCluster1 = 1;
+ long entryIdCluster1 = 0;
+ long ledgerIdCluster2 = 2;
+ long entryIdCluster2 = 0;
+ Random random = new Random();
+
+ int addedSnapshots = 0;
+ long markDeletePositionEntryId = 0;
+ long firstSnapshotEntryId = -1L;
+
+ while (addedSnapshots < addSnapshotCount) {
+ // fill up the cache with random number of entries
+ int addInThisRount = 1 + random.nextInt(2 * maxSnapshotToCache);
+ for (int i = 0; i < addInThisRount; i++) {
+ ReplicatedSubscriptionsSnapshot snapshot = new
ReplicatedSubscriptionsSnapshot()
+ .setSnapshotId(UUID.randomUUID().toString());
+
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+ .setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+ .setEntryId(entryIdCluster2);
+ cache.addNewSnapshot(snapshot);
+ if (firstSnapshotEntryId == -1L) {
+ firstSnapshotEntryId = entryIdCluster1;
+ }
+ addedSnapshots++;
+ entryIdCluster1 += 100 + random.nextInt(1000);
+ entryIdCluster2 += 100 + random.nextInt(1000);
+ }
+ markDeletePositionEntryId = firstSnapshotEntryId +
random.nextLong(entryIdCluster1 - firstSnapshotEntryId);
Review Comment:
The usage of `random.nextLong(entryIdCluster1 - firstSnapshotEntryId)` can
throw an IllegalArgumentException if the bound is less than or equal to 0. In
scenarios where `entryIdCluster1` equals `firstSnapshotEntryId` (which could
happen if the loop only adds one snapshot), this will cause a test failure.
Consider adding a guard condition to ensure the bound is positive before
calling nextLong.
```suggestion
long bound = entryIdCluster1 - firstSnapshotEntryId;
if (bound > 0) {
markDeletePositionEntryId = firstSnapshotEntryId +
random.nextLong(bound);
} else {
markDeletePositionEntryId = firstSnapshotEntryId;
}
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java:
##########
@@ -96,14 +106,217 @@ public void testSnapshotCachePruning() {
cache.addNewSnapshot(s3);
cache.addNewSnapshot(s4);
- // Snapshot-1 was already pruned
- assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1,
1)));
- ReplicatedSubscriptionsSnapshot snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
+ ReplicatedSubscriptionSnapshotCache.SnapshotResult
+ snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-2");
+ // Snapshot-2 was already pruned
+ assertEquals(snapshot.position(), PositionFactory.create(1, 1));
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5,
5));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-4");
+ assertEquals(snapshot.position(), PositionFactory.create(4, 4));
}
-}
+
+
+ @Test(timeOut = 15_000)
+ public void testSnapshotCachePruningByKeepingEqualDistance() {
+ int maxSnapshotToCache = 10_000;
+ int addSnapshotCount = 1_000_000;
+
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription",
maxSnapshotToCache,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ long ledgerIdCluster1 = 1;
+ long entryIdCluster1 = 0;
+ long ledgerIdCluster2 = 2;
+ long entryIdCluster2 = 0;
+ Random random = new Random();
+
+ // create a large number of snapshots where the entry ids move forward
100 + 0-1000 (random) entries at a time
+ for (int i = 0; i < addSnapshotCount; i++) {
+ ReplicatedSubscriptionsSnapshot snapshot = new
ReplicatedSubscriptionsSnapshot()
+ .setSnapshotId(UUID.randomUUID().toString());
+
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+ .setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+ .setEntryId(entryIdCluster2);
+ cache.addNewSnapshot(snapshot);
+ entryIdCluster1 += 100 + random.nextInt(1000);
+ entryIdCluster2 += 100 + random.nextInt(1000);
+ }
+
+ // validate the state of snapshots
+ List<ReplicatedSubscriptionSnapshotCache.SnapshotEntry> snapshots =
cache.getSnapshots();
+ assertEquals(snapshots.size(), maxSnapshotToCache);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry second =
snapshots.get(1);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry secondLast =
snapshots.get(snapshots.size() - 2);
+ long distance = secondLast.position().getEntryId() -
second.position().getEntryId();
+ long expectedAverageDistance = distance / snapshots.size();
+
+ long maxDistance = 0;
+ long minDistance = Long.MAX_VALUE;
+ for (int i = 0; i < snapshots.size() - 1; i++) {
+ Position position = snapshots.get(i).position();
+ Position nextPosition = snapshots.get(i + 1).position();
+ long distanceToNext = nextPosition.getEntryId() -
position.getEntryId();
+ if (log.isDebugEnabled()) {
+ log.debug(i + ": " + position + " -> " + nextPosition + "
distance to next: " + distanceToNext
+ + " to previous: " +
snapshots.get(i).distanceToPrevious());
+ }
+ maxDistance = Math.max(maxDistance, distanceToNext);
+ minDistance = Math.min(minDistance, distanceToNext);
+
+ // ensure that each snapshot is within 2 * expected average
distance from the previous one
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry snapshotEntry =
snapshots.get(i);
+ assertThat(snapshotEntry.distanceToPrevious()).describedAs(
+ "distance to previous for snapshot entry: %s is
not expected", snapshotEntry)
+ .isLessThanOrEqualTo(expectedAverageDistance * 2);
+ }
+
+ log.info("Average distance, expected: {}", expectedAverageDistance);
+ log.info("Min distance: {}", minDistance);
+ log.info("Max distance: {}", maxDistance);
+
+ // check that picking a random markDeletePosition within the range of
the second snapshot will result in a
+ // snapshot that is within 2 * expectedAverageDistance from the
markDeletePosition
+ Position markDeletePosition =
+ PositionFactory.create(ledgerIdCluster1,
second.position().getEntryId() + random.nextLong(distance));
+
+
assertThat(cache.advancedMarkDeletePosition(markDeletePosition)).satisfies(snapshotResult
-> {
+ long snapshotDistance = markDeletePosition.getEntryId() -
snapshotResult.position().getEntryId();
+ assertThat(snapshotDistance).describedAs("snapshot result: %s
markDeletePosition: %s", snapshotResult,
+
markDeletePosition).isLessThanOrEqualTo(expectedAverageDistance * 2);
+ });
+
+ }
+
+ @Test
+ public void testSnapshotCachePruningScenarios() {
+ ReplicatedSubscriptionSnapshotCache cache = new
ReplicatedSubscriptionSnapshotCache("my-subscription", 5,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ ReplicatedSubscriptionsSnapshot s1 = new
ReplicatedSubscriptionsSnapshot();
+ s1.setLocalMessageId().setLedgerId(1).setEntryId(1);
+ cache.addNewSnapshot(s1);
+
+ ReplicatedSubscriptionsSnapshot s2 = new
ReplicatedSubscriptionsSnapshot();
+ s2.setLocalMessageId().setLedgerId(1).setEntryId(2);
+ cache.addNewSnapshot(s2);
+
+ ReplicatedSubscriptionsSnapshot s3 = new
ReplicatedSubscriptionsSnapshot();
+ s3.setLocalMessageId().setLedgerId(1).setEntryId(10);
+ cache.addNewSnapshot(s3);
+
+ ReplicatedSubscriptionsSnapshot s4 = new
ReplicatedSubscriptionsSnapshot();
+ s4.setLocalMessageId().setLedgerId(1).setEntryId(15);
+ cache.addNewSnapshot(s4);
+
+ ReplicatedSubscriptionsSnapshot s5 = new
ReplicatedSubscriptionsSnapshot();
+ s5.setLocalMessageId().setLedgerId(1).setEntryId(25);
+ cache.addNewSnapshot(s5);
+
+ ReplicatedSubscriptionsSnapshot s6 = new
ReplicatedSubscriptionsSnapshot();
+ s6.setLocalMessageId().setLedgerId(1).setEntryId(100);
+ cache.addNewSnapshot(s6);
+
+ // s2 should be pruned (special case where head is previous to the
removed one)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 2)));
+
+ ReplicatedSubscriptionsSnapshot s7 = new
ReplicatedSubscriptionsSnapshot();
+ s7.setLocalMessageId().setLedgerId(1).setEntryId(110);
+ cache.addNewSnapshot(s7);
+
+ // s3 should be pruned (ordinary case where middle entry is removed)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 10)));
+
+ ReplicatedSubscriptionsSnapshot s8 = new
ReplicatedSubscriptionsSnapshot();
+ s8.setLocalMessageId().setLedgerId(1).setEntryId(112);
+ cache.addNewSnapshot(s8);
+
+ // s7 should be pruned (special case where tail is after the removed
one)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 110)));
+
+
+ ReplicatedSubscriptionsSnapshot s9 = new
ReplicatedSubscriptionsSnapshot();
+ s9.setLocalMessageId().setLedgerId(1).setEntryId(113);
+ cache.addNewSnapshot(s9);
+
+ // s8 should be pruned (check that pruning works after the one before
the tail was removed)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 112)));
+
+ ReplicatedSubscriptionsSnapshot s10 = new
ReplicatedSubscriptionsSnapshot();
+ s10.setLocalMessageId().setLedgerId(1).setEntryId(200);
+ cache.addNewSnapshot(s10);
+
+ // s4 should be pruned (check that pruning still works immediately
after head)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 15)));
+
+ ReplicatedSubscriptionsSnapshot s11 = new
ReplicatedSubscriptionsSnapshot();
+ // entry id that is before the tail
+ s11.setLocalMessageId().setLedgerId(1).setEntryId(50);
+ cache.addNewSnapshot(s11);
+
+ // all snapshots should be pruned, and s11 should be the only one
+ assertThat(cache.getSnapshots()).hasSize(1)
+ .first().satisfies(snapshotEntry ->
assertThat(snapshotEntry.position()).isEqualTo(
+ PositionFactory.create(1, 50)));
+ }
+
+ @Test(timeOut = 15_000)
+ public void testSnapshotCacheStressTest() {
+ int maxSnapshotToCache = 10_000;
+ int addSnapshotCount = 1_000_000;
+
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription",
maxSnapshotToCache,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ long ledgerIdCluster1 = 1;
+ long entryIdCluster1 = 0;
+ long ledgerIdCluster2 = 2;
+ long entryIdCluster2 = 0;
+ Random random = new Random();
+
+ int addedSnapshots = 0;
+ long markDeletePositionEntryId = 0;
+ long firstSnapshotEntryId = -1L;
+
+ while (addedSnapshots < addSnapshotCount) {
+ // fill up the cache with random number of entries
+ int addInThisRount = 1 + random.nextInt(2 * maxSnapshotToCache);
+ for (int i = 0; i < addInThisRount; i++) {
Review Comment:
There is a spelling error: "addInThisRount" should be "addInThisRound".
```suggestion
int addInThisRound = 1 + random.nextInt(2 * maxSnapshotToCache);
for (int i = 0; i < addInThisRound; i++) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]