This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 63450269932 [improve][broker] Improve replicated subscription snapshot 
cache so that subscriptions can be replicated when mark delete position update 
is not frequent (#25044)
63450269932 is described below

commit 634502699321469a6816d12a09081d3f63333bba
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Dec 12 12:47:41 2025 +0200

    [improve][broker] Improve replicated subscription snapshot cache so that 
subscriptions can be replicated when mark delete position update is not 
frequent (#25044)
---
 conf/broker.conf                                   |   2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   2 +-
 .../service/persistent/PersistentSubscription.java |   6 +-
 .../ReplicatedSubscriptionSnapshotCache.java       | 354 +++++++++++++++++++--
 .../ReplicatedSubscriptionsController.java         |  18 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   2 +
 .../ReplicatedSubscriptionSnapshotCacheTest.java   | 241 +++++++++++++-
 7 files changed, 571 insertions(+), 54 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 997556c1740..e4923172d13 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -735,7 +735,7 @@ replicatedSubscriptionsSnapshotFrequencyMillis=1000
 replicatedSubscriptionsSnapshotTimeoutSeconds=30
 
 # Max number of snapshot to be cached per subscription.
-replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
+replicatedSubscriptionsSnapshotMaxCachedPerSubscription=30
 
 # Max memory size for broker handling messages sending from producers.
 # If the processing message size exceed this value, broker will stop read data
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c6bedcfd01d..e5b7f0e458d 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1548,7 +1548,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(
             category = CATEGORY_SERVER,
             doc = "Max number of snapshot to be cached per subscription.")
-    private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
+    private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 30;
 
     @FieldContext(
             category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a0be5c7c945..457bae5e69c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -209,7 +209,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
             this.replicatedSubscriptionSnapshotCache = null;
         } else if (this.replicatedSubscriptionSnapshotCache == null) {
             this.replicatedSubscriptionSnapshotCache = new 
ReplicatedSubscriptionSnapshotCache(subName,
-                    
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+                    
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription(),
+                    getCursor().getManagedLedger()::getNumberOfEntries);
         }
 
         if (this.cursor != null) {
@@ -570,7 +571,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
     private void handleReplicatedSubscriptionsUpdate(Position 
markDeletePosition) {
         ReplicatedSubscriptionSnapshotCache snapshotCache = 
this.replicatedSubscriptionSnapshotCache;
         if (snapshotCache != null) {
-            ReplicatedSubscriptionsSnapshot snapshot = 
snapshotCache.advancedMarkDeletePosition(markDeletePosition);
+            ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshot = 
snapshotCache
+                    .advancedMarkDeletePosition(markDeletePosition);
             if (snapshot != null) {
                 topic.getReplicatedSubscriptionController()
                         .ifPresent(c -> c.localSubscriptionUpdated(subName, 
snapshot));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
index 6b4e60dc63b..2e5dbbe0c78 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
@@ -18,14 +18,19 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Range;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.ToLongFunction;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.util.StringInterner;
 
 /**
  * Store the last N snapshots that were scanned by a particular subscription.
@@ -33,66 +38,357 @@ import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 @Slf4j
 public class ReplicatedSubscriptionSnapshotCache {
     private final String subscription;
-    private final NavigableMap<Position, ReplicatedSubscriptionsSnapshot> 
snapshots;
+    private final ToLongFunction<Range<Position>> distanceFunction;
     private final int maxSnapshotToCache;
+    private SnapshotEntry head;
+    private SnapshotEntry tail;
+    private int numberOfSnapshots = 0;
+    private SnapshotEntry lastSortedEntry;
+    private final SortedSet<SnapshotEntry> sortedSnapshots;
 
-    public ReplicatedSubscriptionSnapshotCache(String subscription, int 
maxSnapshotToCache) {
+    public ReplicatedSubscriptionSnapshotCache(String subscription, int 
maxSnapshotToCache,
+                                               ToLongFunction<Range<Position>> 
distanceFunction) {
         this.subscription = subscription;
-        this.snapshots = new TreeMap<>();
+        this.distanceFunction = distanceFunction;
+        if (maxSnapshotToCache < 3) {
+            throw new IllegalArgumentException("maxSnapshotToCache must be >= 
3");
+        }
         this.maxSnapshotToCache = maxSnapshotToCache;
+        this.sortedSnapshots = new TreeSet<>();
+    }
+
+    /**
+     * Memory footprint estimate for one SnapshotEntry with shared String 
cluster instances.
+     *
+     * Assumptions:
+     * - 64-bit JVM with compressed OOPs enabled (default for heap sizes < 
32GB)
+     * - Cluster name strings are shared/interned across entries
+     * - 2 ClusterEntry objects per SnapshotEntry (typical case)
+     * - Each entry has its own Position objects
+     * - 1 of the ClusterEntry objects is for the local cluster and shares the 
local cluster position
+     * - List.of() creates ImmutableCollections.List12 for 2-element lists
+     * - 8-byte memory alignment padding applied where needed
+     *
+     * Breakdown:
+     *
+     * 1. SnapshotEntry object: ~32 bytes
+     *    - Object header (mark + klass): 12 bytes
+     *    - Position position: 4 bytes (reference)
+     *    - List<ClusterEntry> clusters: 4 bytes (reference)
+     *    - long distanceToPrevious: 8 bytes
+     *    - SnapshotEntry next: 4 bytes (reference)
+     *    - SnapshotEntry prev: 4 bytes (reference)
+     *    - Alignment padding: 4 bytes
+     *    Subtotal: 40 bytes
+     *
+     * 2. Position object (snapshot position): ~32 bytes
+     *    - Object header: 12 bytes
+     *    - long ledgerId: 8 bytes
+     *    - long entryId: 8 bytes
+     *    - Alignment padding: 4 bytes
+     *    Subtotal: 32 bytes
+     *
+     * 3. ImmutableCollections.List12 (for 2 elements): ~32 bytes
+     *    - Object header: 12 bytes
+     *    - Object e0: 4 bytes (reference to first ClusterEntry)
+     *    - Object e1: 4 bytes (reference to second ClusterEntry)
+     *    - Alignment padding: 12 bytes
+     *    Subtotal: 32 bytes
+     *
+     * 4. ClusterEntry objects (2 instances): ~64 bytes
+     *    Each ClusterEntry (Java record): ~24 bytes
+     *    - Object header: 12 bytes
+     *    - String cluster: 4 bytes (reference, string itself is 
shared/interned)
+     *    - Position position: 4 bytes (reference)
+     *    - Alignment padding: 4 bytes
+     *    Subtotal per entry: 24 bytes × 2 = 48 bytes
+     *
+     *    With alignment to 8 bytes: 48 → 48 bytes
+     *    Actual total for both: 48 bytes
+     *
+     * 5. Additional Position object (for non-local cluster): ~32 bytes
+     *    - Object header: 12 bytes
+     *    - long ledgerId: 8 bytes
+     *    - long entryId: 8 bytes
+     *    - Alignment padding: 4 bytes
+     *    Subtotal: 32 bytes
+     *
+     * Total per SnapshotEntry: 40 + 32 + 32 + 48 + 32 = ~184 bytes
+     *
+     * Rounded estimate: ~184-192 bytes per entry
+     *
+     * Note: Actual memory consumption may vary based on:
+     * - JVM implementation and version
+     * - Whether compressed OOPs are enabled
+     * - Garbage collection and heap layout
+     * - Runtime optimizations (escape analysis, object allocation elimination)
+     * - Number of clusters per snapshot (this estimate assumes 2)
+     */
+    static class SnapshotEntry implements Comparable<SnapshotEntry> {
+        private final Position position;
+        private final List<ClusterEntry> clusters;
+        private long distanceToPrevious = -1;
+        private SnapshotEntry next;
+        private SnapshotEntry prev;
+
+        SnapshotEntry(Position position, List<ClusterEntry> clusters) {
+            this.position = position;
+            this.clusters = clusters;
+        }
+
+        Position position() {
+            return position;
+        }
+
+        List<ClusterEntry> clusters() {
+            return clusters;
+        }
+
+        long distanceToPrevious() {
+            return distanceToPrevious;
+        }
+
+        void setDistanceToPrevious(long distanceToPrevious) {
+            this.distanceToPrevious = distanceToPrevious;
+        }
+
+        SnapshotEntry next() {
+            return next;
+        }
+
+        void setNext(SnapshotEntry next) {
+            this.next = next;
+        }
+
+        SnapshotEntry prev() {
+            return prev;
+        }
+
+        void setPrev(SnapshotEntry prev) {
+            this.prev = prev;
+        }
+
+        long totalDistance() {
+            return distanceToPrevious + (next != null ? 
next.distanceToPrevious : 0L);
+        }
+
+        @Override
+        public int compareTo(SnapshotEntry o) {
+            int retval = Long.compare(totalDistance(), o.totalDistance());
+            if (retval != 0) {
+                return retval;
+            }
+            retval = position.compareTo(o.position);
+            if (retval != 0) {
+                return retval;
+            }
+            return Integer.compare(System.identityHashCode(this), 
System.identityHashCode(o));
+        }
+
+        @Override
+        public String toString() {
+            return String.format("SnapshotEntry(position=%s, clusters=%s, 
distanceToPrevious=%d)", position, clusters,
+                    distanceToPrevious);
+        }
     }
 
+    public record ClusterEntry(String cluster, Position position) {}
+
+    public record SnapshotResult(Position position, List<ClusterEntry> 
clusters) {}
+
     public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot 
snapshot) {
         MarkersMessageIdData msgId = snapshot.getLocalMessageId();
         Position position = PositionFactory.create(msgId.getLedgerId(), 
msgId.getEntryId());
 
+        if (tail != null && position.compareTo(tail.position) <= 0) {
+            // clear the entries in the cache if the new snapshot is older 
than the last one
+            // this means that the subscription has been reset
+            head = null;
+            tail = null;
+            numberOfSnapshots = 0;
+            sortedSnapshots.clear();
+            lastSortedEntry = null;
+        }
+
+        List<ClusterEntry> clusterEntryList = 
snapshot.getClustersList().stream()
+                .map(cmid -> {
+                    Position clusterPosition =
+                            
PositionFactory.create(cmid.getMessageId().getLedgerId(), 
cmid.getMessageId().getEntryId());
+                    if (clusterPosition.equals(position)) {
+                        // reduce memory usage by sharing the same instance 
for the local cluster
+                        clusterPosition = position;
+                    }
+                    return new 
ClusterEntry(StringInterner.intern(cmid.getCluster()), clusterPosition);
+                })
+                .toList();
+
+        // optimize heap memory consumption of the cache
+        if (clusterEntryList.size() == 2) {
+            clusterEntryList = List.of(clusterEntryList.get(0), 
clusterEntryList.get(1));
+        } else if (clusterEntryList.size() == 3) {
+            clusterEntryList = List.of(clusterEntryList.get(0), 
clusterEntryList.get(1), clusterEntryList.get(2));
+        }
+
+        SnapshotEntry entry = new SnapshotEntry(position, clusterEntryList);
+
         if (log.isDebugEnabled()) {
             log.debug("[{}] Added new replicated-subscription snapshot at {} 
-- {}", subscription, position,
                     snapshot.getSnapshotId());
         }
 
-        snapshots.put(position, snapshot);
+        // append to the double-linked list
+        if (head == null) {
+            head = entry;
+            tail = entry;
+            entry.setDistanceToPrevious(0);
+        } else {
+            tail.setNext(entry);
+            entry.setPrev(tail);
+            tail = entry;
+        }
+        numberOfSnapshots++;
 
         // Prune the cache
-        while (snapshots.size() > maxSnapshotToCache) {
-            snapshots.pollFirstEntry();
+        if (numberOfSnapshots > maxSnapshotToCache) {
+            removeSingleEntryWithMinimumTotalDistanceToPreviousAndNext();
+        }
+    }
+
+    private void removeSingleEntryWithMinimumTotalDistanceToPreviousAndNext() {
+        updateSortedEntriesByTotalDistance();
+
+        SnapshotEntry minEntry = sortedSnapshots.first();
+        // Defensive check: minEntry should never be head or tail, as these 
are boundary entries that must be preserved
+        if (minEntry == head || minEntry == tail) {
+            throw new IllegalStateException("minEntry should not be head or 
tail boundary entry");
+        }
+        SnapshotEntry minEntryNext = minEntry.next;
+        SnapshotEntry minEntryPrevious = minEntry.prev;
+
+        // remove minEntry from the sorted set
+        sortedSnapshots.remove(minEntry);
+
+        // remove minEntryPrevious and minEntryNext from the sorted set since 
the distance will be updated
+        if (minEntryNext != tail) {
+            sortedSnapshots.remove(minEntryNext);
+        }
+        if (minEntryPrevious != head) {
+            sortedSnapshots.remove(minEntryPrevious);
+        }
+
+        // remove minEntry from the linked list
+        minEntryPrevious.setNext(minEntryNext);
+        minEntryNext.setPrev(minEntryPrevious);
+        numberOfSnapshots--;
+
+        // handle the case where the entry to remove is the last entry that 
has been sorted
+        if (lastSortedEntry == minEntry) {
+            lastSortedEntry = minEntryPrevious;
+        }
+
+        // update distanceToPrevious for the next entry
+        minEntryNext.setDistanceToPrevious(minEntryNext.distanceToPrevious + 
minEntry.distanceToPrevious);
+
+        // add entries back to the sorted set so that entries up to 
lastSortedEntry are sorted
+        if (minEntryNext != tail) {
+            sortedSnapshots.add(minEntryNext);
+        }
+        if (minEntryPrevious != head) {
+            sortedSnapshots.add(minEntryPrevious);
+        }
+    }
+
+    /**
+     * Maintains a sorted set of entries ordered by their total distance to 
adjacent entries.
+     * This method calculates the 'distanceToPrevious' field for both current 
and next entries before adding them to the
+     * sorted set. Subsequent calls to this method will continue processing 
from where the last entry was added.
+     */
+    private void updateSortedEntriesByTotalDistance() {
+        SnapshotEntry current = lastSortedEntry != null ? lastSortedEntry.next 
: head.next;
+        SnapshotEntry previousLoopEntry = null;
+        while (current != null) {
+            // calculate the distance to the previous snapshot entry
+            if (current.distanceToPrevious == -1) {
+                long distanceToPrevious =
+                        
distanceFunction.applyAsLong(Range.open(current.prev.position, 
current.position));
+                current.setDistanceToPrevious(distanceToPrevious);
+            }
+            // Add the entry to the sorted set, which is sorted by total 
distance to the previous and the next entry.
+            // We cannot add the current entry here since sorting requires 
that the current and next entries have
+            // their distanceToPrevious field set.
+            if (previousLoopEntry != null) {
+                sortedSnapshots.add(previousLoopEntry);
+                lastSortedEntry = previousLoopEntry;
+            }
+            previousLoopEntry = current;
+            current = current.next;
         }
     }
 
     /**
      * Signal that the mark-delete position on the subscription has been 
advanced. If there is a snapshot that
-     * correspond to this position, it will returned, other it will return 
null.
+     * corresponds to this position, it will be returned; otherwise it will 
return null.
      */
-    public synchronized ReplicatedSubscriptionsSnapshot 
advancedMarkDeletePosition(Position pos) {
-        ReplicatedSubscriptionsSnapshot snapshot = null;
-        while (!snapshots.isEmpty()) {
-            Map.Entry<Position, ReplicatedSubscriptionsSnapshot> firstEntry =
-                    snapshots.firstEntry();
-            Position first = firstEntry.getKey();
-            if (first.compareTo(pos) > 0) {
-                // Snapshot is associated which an higher position, so it 
cannot be used now
+    public synchronized SnapshotResult advancedMarkDeletePosition(Position 
pos) {
+        SnapshotEntry snapshot = null;
+
+        SnapshotEntry current = head;
+
+        while (current != null) {
+            if (current.position.compareTo(pos) > 0) {
+                // Snapshot is associated with a higher position, so it cannot 
be used now
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Snapshot {} is associated with an higher 
position {} so it cannot be used for mark "
-                            + "delete position {}", subscription, 
firstEntry.getValue(), first, pos);
+                    log.debug("[{}] Snapshot {} is associated with a higher 
position {} so it cannot be used for mark "
+                            + "delete position {}", subscription, current, 
current.position, pos);
                 }
                 break;
-            } else {
-                // This snapshot is potentially good. Continue the search for 
to see if there is a higher snapshot we
-                // can use
-                snapshot = snapshots.pollFirstEntry().getValue();
             }
+            // This snapshot is potentially good. Continue the search to see 
if there is a higher snapshot we
+            // can use
+            snapshot = current;
+            if (current == lastSortedEntry) {
+                lastSortedEntry = null;
+            }
+            current = current.next;
+            head = current;
+            if (head != null) {
+                sortedSnapshots.remove(head);
+            }
+            numberOfSnapshots--;
+        }
+
+        if (head == null) {
+            tail = null;
+        } else {
+            head.setPrev(null);
+            head.setDistanceToPrevious(0L);
         }
 
         if (log.isDebugEnabled()) {
             if (snapshot != null) {
-                log.debug("[{}] Advanced mark-delete position to {} -- found 
snapshot {} at {}:{}", subscription, pos,
-                        snapshot.getSnapshotId(),
-                        snapshot.getLocalMessageId().getLedgerId(),
-                        snapshot.getLocalMessageId().getEntryId());
+                log.debug("[{}] Advanced mark-delete position to {} -- found 
snapshot at {}", subscription, pos,
+                        snapshot.position());
             } else {
                 log.debug("[{}] Advanced mark-delete position to {} -- 
snapshot not found", subscription, pos);
             }
         }
-        return snapshot;
+
+        return snapshot != null ? new SnapshotResult(snapshot.position(), 
snapshot.clusters()) : null;
+    }
+
+    @VisibleForTesting
+    synchronized List<SnapshotEntry> getSnapshots() {
+        List<SnapshotEntry> snapshots = new ArrayList<>(numberOfSnapshots);
+        SnapshotEntry current = head;
+        while (current != null) {
+            snapshots.add(current);
+            current = current.next;
+        }
+        return snapshots;
+    }
+
+    @VisibleForTesting
+    synchronized int size() {
+        return numberOfSnapshots;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 7ae48f7976b..5eaa313c3d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
-import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest;
 import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate;
@@ -126,20 +125,19 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
         }
     }
 
-    public void localSubscriptionUpdated(String subscriptionName, 
ReplicatedSubscriptionsSnapshot snapshot) {
+    public void localSubscriptionUpdated(String subscriptionName,
+                                         
ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshot) {
         if (log.isDebugEnabled()) {
-            log.debug("[{}][{}][{}] Updating subscription to snapshot {}",
-                    topic.getBrokerService().pulsar().getBrokerId(), topic, 
subscriptionName,
-                    snapshot.getClustersList().stream()
-                            .map(cmid -> String.format("%s -> %d:%d", 
cmid.getCluster(),
-                                    cmid.getMessageId().getLedgerId(), 
cmid.getMessageId().getEntryId()))
+            log.debug("[{}][{}] Updating subscription to snapshot {}", topic, 
subscriptionName,
+                    snapshot.clusters().stream()
+                            .map(entry -> String.format("%s -> %s", 
entry.cluster(), entry.position()))
                             .collect(Collectors.toList()));
         }
 
         Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>();
-        for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
-            ClusterMessageId cmid = snapshot.getClusterAt(i);
-            clusterIds.put(cmid.getCluster(), cmid.getMessageId());
+        for (ReplicatedSubscriptionSnapshotCache.ClusterEntry cluster : 
snapshot.clusters()) {
+            clusterIds.put(cluster.cluster(), new 
MarkersMessageIdData().setLedgerId(cluster.position().getLedgerId())
+                    .setEntryId(cluster.position().getEntryId()));
         }
 
         ByteBuf subscriptionUpdate = 
Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 097a4f6de05..3b64b2ecc2c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1373,6 +1373,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
         cursorMock = mock(ManagedCursorImpl.class);
+        doReturn(ledgerMock).when(cursorMock).getManagedLedger();
+        doReturn(0L).when(ledgerMock).getNumberOfEntries(any());
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
index 5f6ae405ccd..e1e7e0d4164 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
@@ -18,19 +18,27 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker")
 public class ReplicatedSubscriptionSnapshotCacheTest {
 
     @Test
     public void testSnapshotCache() {
-        ReplicatedSubscriptionSnapshotCache cache = new 
ReplicatedSubscriptionSnapshotCache("my-subscription", 10);
+        ReplicatedSubscriptionSnapshotCache cache =
+                new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, 
range -> 0);
 
         assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 
0)));
         
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(100, 0)));
@@ -58,13 +66,14 @@ public class ReplicatedSubscriptionSnapshotCacheTest {
 
         assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 
0)));
         assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 
0)));
-        ReplicatedSubscriptionsSnapshot snapshot = 
cache.advancedMarkDeletePosition(PositionFactory.create(1, 1));
+        ReplicatedSubscriptionSnapshotCache.SnapshotResult
+                snapshot = 
cache.advancedMarkDeletePosition(PositionFactory.create(1, 1));
         assertNotNull(snapshot);
-        assertEquals(snapshot.getSnapshotId(), "snapshot-1");
+        assertEquals(snapshot.position(), PositionFactory.create(1, 1));
 
         snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5, 
6));
         assertNotNull(snapshot);
-        assertEquals(snapshot.getSnapshotId(), "snapshot-5");
+        assertEquals(snapshot.position(), PositionFactory.create(5, 5));
 
         // Snapshots should have been now removed
         assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(2, 
2)));
@@ -73,7 +82,8 @@ public class ReplicatedSubscriptionSnapshotCacheTest {
 
     @Test
     public void testSnapshotCachePruning() {
-        ReplicatedSubscriptionSnapshotCache cache = new 
ReplicatedSubscriptionSnapshotCache("my-subscription", 3);
+        ReplicatedSubscriptionSnapshotCache cache =
+                new ReplicatedSubscriptionSnapshotCache("my-subscription", 3, 
range -> 1);
 
         ReplicatedSubscriptionsSnapshot s1 = new 
ReplicatedSubscriptionsSnapshot()
                 .setSnapshotId("snapshot-1");
@@ -96,14 +106,223 @@ public class ReplicatedSubscriptionSnapshotCacheTest {
         cache.addNewSnapshot(s3);
         cache.addNewSnapshot(s4);
 
-        // Snapshot-1 was already pruned
-        assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 
1)));
-        ReplicatedSubscriptionsSnapshot snapshot = 
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
+        ReplicatedSubscriptionSnapshotCache.SnapshotResult
+                snapshot = 
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
         assertNotNull(snapshot);
-        assertEquals(snapshot.getSnapshotId(), "snapshot-2");
+        // Snapshot-2 was already pruned
+        assertEquals(snapshot.position(), PositionFactory.create(1, 1));
 
         snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5, 
5));
         assertNotNull(snapshot);
-        assertEquals(snapshot.getSnapshotId(), "snapshot-4");
+        assertEquals(snapshot.position(), PositionFactory.create(4, 4));
     }
-}
+
+
+    @Test(timeOut = 15_000)
+    public void testSnapshotCachePruningByKeepingEqualDistance() {
+        int maxSnapshotToCache = 10_000;
+        int addSnapshotCount = 1_000_000;
+
+        ReplicatedSubscriptionSnapshotCache cache =
+                new ReplicatedSubscriptionSnapshotCache("my-subscription", 
maxSnapshotToCache,
+                        range -> range.upperEndpoint().getEntryId() - 
range.lowerEndpoint().getEntryId());
+
+        long ledgerIdCluster1 = 1;
+        long entryIdCluster1 = 0;
+        long ledgerIdCluster2 = 2;
+        long entryIdCluster2 = 0;
+        Random random = new Random();
+
+        // create a large number of snapshots where the entry ids move forward 
100 + 0-1000 (random) entries at a time
+        for (int i = 0; i < addSnapshotCount; i++) {
+            ReplicatedSubscriptionsSnapshot snapshot = new 
ReplicatedSubscriptionsSnapshot()
+                    .setSnapshotId(UUID.randomUUID().toString());
+            
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+            
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+                    .setEntryId(entryIdCluster1);
+            
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+                    .setEntryId(entryIdCluster2);
+            cache.addNewSnapshot(snapshot);
+            entryIdCluster1 += 100 + random.nextInt(1000);
+            entryIdCluster2 += 100 + random.nextInt(1000);
+        }
+
+        // validate the state of snapshots
+        List<ReplicatedSubscriptionSnapshotCache.SnapshotEntry> snapshots = 
cache.getSnapshots();
+        assertEquals(snapshots.size(), maxSnapshotToCache);
+        ReplicatedSubscriptionSnapshotCache.SnapshotEntry second = 
snapshots.get(1);
+        ReplicatedSubscriptionSnapshotCache.SnapshotEntry secondLast = 
snapshots.get(snapshots.size() - 2);
+        long distance = secondLast.position().getEntryId() - 
second.position().getEntryId();
+        long expectedAverageDistance = distance / snapshots.size();
+
+        long maxDistance = 0;
+        long minDistance = Long.MAX_VALUE;
+        for (int i = 0; i < snapshots.size() - 1; i++) {
+            Position position = snapshots.get(i).position();
+            Position nextPosition = snapshots.get(i + 1).position();
+            long distanceToNext = nextPosition.getEntryId() - 
position.getEntryId();
+            if (log.isDebugEnabled()) {
+                log.debug(i + ": " + position + " -> " + nextPosition + " 
distance to next: " + distanceToNext
+                        + " to previous: " + 
snapshots.get(i).distanceToPrevious());
+            }
+            maxDistance = Math.max(maxDistance, distanceToNext);
+            minDistance = Math.min(minDistance, distanceToNext);
+
+            // ensure that each snapshot is within 2 * expected average 
distance from the previous one
+            ReplicatedSubscriptionSnapshotCache.SnapshotEntry snapshotEntry = 
snapshots.get(i);
+            assertThat(snapshotEntry.distanceToPrevious()).describedAs(
+                            "distance to previous for snapshot entry: %s is 
not expected", snapshotEntry)
+                    .isLessThanOrEqualTo(expectedAverageDistance * 2);
+        }
+
+        log.info("Average distance, expected: {}", expectedAverageDistance);
+        log.info("Min distance: {}", minDistance);
+        log.info("Max distance: {}", maxDistance);
+
+        // check that picking a random markDeletePosition within the range of 
the second snapshot will result in a
+        // snapshot that is within 2 * expectedAverageDistance from the 
markDeletePosition
+        Position markDeletePosition =
+                PositionFactory.create(ledgerIdCluster1,
+                        second.position().getEntryId() + 
random.nextLong(Math.max(1, distance)));
+
+        
assertThat(cache.advancedMarkDeletePosition(markDeletePosition)).satisfies(snapshotResult
 -> {
+            long snapshotDistance = markDeletePosition.getEntryId() - 
snapshotResult.position().getEntryId();
+            assertThat(snapshotDistance).describedAs("snapshot result: %s 
markDeletePosition: %s", snapshotResult,
+                    
markDeletePosition).isLessThanOrEqualTo(expectedAverageDistance * 2);
+        });
+
+    }
+
+    @Test
+    public void testSnapshotCachePruningScenarios() {
+        ReplicatedSubscriptionSnapshotCache cache = new 
ReplicatedSubscriptionSnapshotCache("my-subscription", 5,
+                range -> range.upperEndpoint().getEntryId() - 
range.lowerEndpoint().getEntryId());
+
+        ReplicatedSubscriptionsSnapshot s1 = new 
ReplicatedSubscriptionsSnapshot();
+        s1.setLocalMessageId().setLedgerId(1).setEntryId(1);
+        cache.addNewSnapshot(s1);
+
+        ReplicatedSubscriptionsSnapshot s2 = new 
ReplicatedSubscriptionsSnapshot();
+        s2.setLocalMessageId().setLedgerId(1).setEntryId(2);
+        cache.addNewSnapshot(s2);
+
+        ReplicatedSubscriptionsSnapshot s3 = new 
ReplicatedSubscriptionsSnapshot();
+        s3.setLocalMessageId().setLedgerId(1).setEntryId(10);
+        cache.addNewSnapshot(s3);
+
+        ReplicatedSubscriptionsSnapshot s4 = new 
ReplicatedSubscriptionsSnapshot();
+        s4.setLocalMessageId().setLedgerId(1).setEntryId(15);
+        cache.addNewSnapshot(s4);
+
+        ReplicatedSubscriptionsSnapshot s5 = new 
ReplicatedSubscriptionsSnapshot();
+        s5.setLocalMessageId().setLedgerId(1).setEntryId(25);
+        cache.addNewSnapshot(s5);
+
+        ReplicatedSubscriptionsSnapshot s6 = new 
ReplicatedSubscriptionsSnapshot();
+        s6.setLocalMessageId().setLedgerId(1).setEntryId(100);
+        cache.addNewSnapshot(s6);
+
+        // s2 should be pruned (special case where head is previous to the 
removed one)
+        assertThat(cache.getSnapshots()).hasSize(5)
+                .allSatisfy(snapshotEntry -> 
assertThat(snapshotEntry.position()).isNotEqualTo(
+                        PositionFactory.create(1, 2)));
+
+        ReplicatedSubscriptionsSnapshot s7 = new 
ReplicatedSubscriptionsSnapshot();
+        s7.setLocalMessageId().setLedgerId(1).setEntryId(110);
+        cache.addNewSnapshot(s7);
+
+        // s3 should be pruned (ordinary case where middle entry is removed)
+        assertThat(cache.getSnapshots()).hasSize(5)
+                .allSatisfy(snapshotEntry -> 
assertThat(snapshotEntry.position()).isNotEqualTo(
+                        PositionFactory.create(1, 10)));
+
+        ReplicatedSubscriptionsSnapshot s8 = new 
ReplicatedSubscriptionsSnapshot();
+        s8.setLocalMessageId().setLedgerId(1).setEntryId(112);
+        cache.addNewSnapshot(s8);
+
+        // s7 should be pruned (special case where tail is after the removed 
one)
+        assertThat(cache.getSnapshots()).hasSize(5)
+                .allSatisfy(snapshotEntry -> 
assertThat(snapshotEntry.position()).isNotEqualTo(
+                        PositionFactory.create(1, 110)));
+
+
+        ReplicatedSubscriptionsSnapshot s9 = new 
ReplicatedSubscriptionsSnapshot();
+        s9.setLocalMessageId().setLedgerId(1).setEntryId(113);
+        cache.addNewSnapshot(s9);
+
+        // s8 should be pruned (check that pruning works after the one before 
the tail was removed)
+        assertThat(cache.getSnapshots()).hasSize(5)
+                .allSatisfy(snapshotEntry -> 
assertThat(snapshotEntry.position()).isNotEqualTo(
+                        PositionFactory.create(1, 112)));
+
+        ReplicatedSubscriptionsSnapshot s10 = new 
ReplicatedSubscriptionsSnapshot();
+        s10.setLocalMessageId().setLedgerId(1).setEntryId(200);
+        cache.addNewSnapshot(s10);
+
+        // s4 should be pruned (check that pruning still works immediately 
after head)
+        assertThat(cache.getSnapshots()).hasSize(5)
+                .allSatisfy(snapshotEntry -> 
assertThat(snapshotEntry.position()).isNotEqualTo(
+                        PositionFactory.create(1, 15)));
+
+        ReplicatedSubscriptionsSnapshot s11 = new 
ReplicatedSubscriptionsSnapshot();
+        // entry id that is before the tail
+        s11.setLocalMessageId().setLedgerId(1).setEntryId(50);
+        cache.addNewSnapshot(s11);
+
+        // all snapshots should be pruned, and s11 should be the only one
+        assertThat(cache.getSnapshots()).hasSize(1)
+                .first().satisfies(snapshotEntry -> 
assertThat(snapshotEntry.position()).isEqualTo(
+                        PositionFactory.create(1, 50)));
+    }
+
+    @Test(timeOut = 15_000)
+    public void testSnapshotCacheStressTest() {
+        int maxSnapshotToCache = 10_000;
+        int addSnapshotCount = 1_000_000;
+
+        ReplicatedSubscriptionSnapshotCache cache =
+                new ReplicatedSubscriptionSnapshotCache("my-subscription", 
maxSnapshotToCache,
+                        range -> range.upperEndpoint().getEntryId() - 
range.lowerEndpoint().getEntryId());
+
+        long ledgerIdCluster1 = 1;
+        long entryIdCluster1 = 0;
+        long ledgerIdCluster2 = 2;
+        long entryIdCluster2 = 0;
+        Random random = new Random();
+
+        int addedSnapshots = 0;
+        long markDeletePositionEntryId = 0;
+        long firstSnapshotEntryId = -1L;
+
+        while (addedSnapshots < addSnapshotCount) {
+            // fill up the cache with random number of entries
+            int addInThisRound = 1 + random.nextInt(2 * maxSnapshotToCache);
+            for (int i = 0; i < addInThisRound; i++) {
+                ReplicatedSubscriptionsSnapshot snapshot = new 
ReplicatedSubscriptionsSnapshot()
+                        .setSnapshotId(UUID.randomUUID().toString());
+                
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+                
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+                        .setEntryId(entryIdCluster1);
+                
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+                        .setEntryId(entryIdCluster2);
+                cache.addNewSnapshot(snapshot);
+                if (firstSnapshotEntryId == -1L) {
+                    firstSnapshotEntryId = entryIdCluster1;
+                }
+                addedSnapshots++;
+                entryIdCluster1 += 100 + random.nextInt(1000);
+                entryIdCluster2 += 100 + random.nextInt(1000);
+            }
+            long bound = entryIdCluster1 - firstSnapshotEntryId;
+            if (bound > 0) {
+                markDeletePositionEntryId = firstSnapshotEntryId + 
random.nextLong(bound);
+            } else {
+                markDeletePositionEntryId = firstSnapshotEntryId;
+            }
+            ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshotResult 
= cache.advancedMarkDeletePosition(
+                    PositionFactory.create(ledgerIdCluster1, 
markDeletePositionEntryId));
+            assertNotNull(snapshotResult);
+            firstSnapshotEntryId = -1L;
+        }
+    }
+}
\ No newline at end of file


Reply via email to