Copilot commented on code in PR #25044:
URL: https://github.com/apache/pulsar/pull/25044#discussion_r2613371383
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java:
##########
@@ -18,81 +18,377 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Range;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.ToLongFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.util.StringInterner;
/**
* Store the last N snapshots that were scanned by a particular subscription.
*/
@Slf4j
public class ReplicatedSubscriptionSnapshotCache {
private final String subscription;
- private final NavigableMap<Position, ReplicatedSubscriptionsSnapshot>
snapshots;
+ private final ToLongFunction<Range<Position>> distanceFunction;
private final int maxSnapshotToCache;
+ private SnapshotEntry head;
+ private SnapshotEntry tail;
+ private int numberOfSnapshots = 0;
+ private SnapshotEntry lastSortedEntry;
+ private final SortedSet<SnapshotEntry> sortedSnapshots;
- public ReplicatedSubscriptionSnapshotCache(String subscription, int
maxSnapshotToCache) {
+ public ReplicatedSubscriptionSnapshotCache(String subscription, int
maxSnapshotToCache,
+ ToLongFunction<Range<Position>>
distanceFunction) {
this.subscription = subscription;
- this.snapshots = new TreeMap<>();
+ this.distanceFunction = distanceFunction;
+ if (maxSnapshotToCache < 3) {
+ throw new IllegalArgumentException("maxSnapshotToCache must be >=
3");
+ }
this.maxSnapshotToCache = maxSnapshotToCache;
+ this.sortedSnapshots = new TreeSet<>();
+ }
+
+ /**
+ * Memory footprint estimate for one SnapshotEntry with shared String
cluster instances.
+ *
+ * Assumptions:
+ * - 64-bit JVM with compressed OOPs enabled (default for heap sizes <
32GB)
+ * - Cluster name strings are shared/interned across entries
+ * - 2 ClusterEntry objects per SnapshotEntry (typical case)
+ * - Each entry has its own Position objects
+ * - 1 of the ClusterEntry objects is for the local cluster and shares the
local cluster position
+ * - List.of() creates ImmutableCollections.List12 for 2-element lists
+ * - 8-byte memory alignment padding applied where needed
+ *
+ * Breakdown:
+ *
+ * 1. SnapshotEntry object: ~32 bytes
+ * - Object header (mark + klass): 12 bytes
+ * - Position position: 4 bytes (reference)
+ * - List<ClusterEntry> clusters: 4 bytes (reference)
+ * - long distanceToPrevious: 8 bytes
+ * - SnapshotEntry next: 4 bytes (reference)
+ * - SnapshotEntry prev: 4 bytes (reference)
+ * - Alignment padding: 4 bytes
+ * Subtotal: 40 bytes
+ *
+ * 2. Position object (snapshot position): ~32 bytes
+ * - Object header: 12 bytes
+ * - long ledgerId: 8 bytes
+ * - long entryId: 8 bytes
+ * - Alignment padding: 4 bytes
+ * Subtotal: 32 bytes
+ *
+ * 3. ImmutableCollections.List12 (for 2 elements): ~32 bytes
+ * - Object header: 12 bytes
+ * - Object e0: 4 bytes (reference to first ClusterEntry)
+ * - Object e1: 4 bytes (reference to second ClusterEntry)
+ * - Alignment padding: 12 bytes
+ * Subtotal: 32 bytes
+ *
+ * 4. ClusterEntry objects (2 instances): ~64 bytes
+ * Each ClusterEntry (Java record): ~24 bytes
+ * - Object header: 12 bytes
+ * - String cluster: 4 bytes (reference, string itself is
shared/interned)
+ * - Position position: 4 bytes (reference)
+ * - Alignment padding: 4 bytes
+ * Subtotal per entry: 24 bytes × 2 = 48 bytes
+ *
+ * With alignment to 8 bytes: 48 → 48 bytes
+ * Actual total for both: 48 bytes
+ *
+ * 5. Additional Position object (for non-local cluster): ~32 bytes
+ * - Object header: 12 bytes
+ * - long ledgerId: 8 bytes
+ * - long entryId: 8 bytes
+ * - Alignment padding: 4 bytes
+ * Subtotal: 32 bytes
+ *
+ * Total per SnapshotEntry: 40 + 32 + 32 + 48 + 32 = ~184 bytes
+ *
+ * Rounded estimate: ~184-192 bytes per entry
Review Comment:
The memory footprint comment calculates position 4 (Additional Position
object) as 32 bytes, but this is only needed when the cluster position differs
from the local position. However, the comment states "1 of the ClusterEntry
objects is for the local cluster and shares the local cluster position", which
means one cluster doesn't need an additional Position object. The calculation
should clarify that this 32 bytes applies per non-local cluster, not as a flat
addition. With 2 clusters total (1 local, 1 remote), the calculation is
correct, but the explanation could be clearer.
```suggestion
* 5. Additional Position object (for each non-local cluster): ~32 bytes
per non-local cluster
* - Object header: 12 bytes
* - long ledgerId: 8 bytes
* - long entryId: 8 bytes
* - Alignment padding: 4 bytes
* Subtotal: 32 bytes × (number of non-local clusters)
*
* Total per SnapshotEntry (assuming 2 clusters: 1 local, 1 remote): 40
+ 32 + 32 + 48 + 32 = ~184 bytes
* (For N clusters: 40 + 32 + 32 + 24×N + 32×(N-1))
*
* Rounded estimate: ~184-192 bytes per entry (for 2 clusters)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]