dao-jun commented on code in PR #25984:
URL: https://github.com/apache/pulsar/pull/25984#discussion_r3398448012


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -816,4 +840,58 @@ public Map<String, TopicMetricBean> genTopicMetricMap() {
         stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
         return stats.genTopicMetricMap();
     }
+
+    /**
+     * Delete orphaned bucket snapshots whose ledger range lies entirely 
before the earliest
+     * surviving ledger. Buckets are deleted sequentially; the chain stops on 
first failure
+     * to avoid wasted work when storage is unavailable.
+     */
+    private synchronized CompletableFuture<Void> asyncTrimImmutableBuckets() {
+        ManagedLedger ledger = context.getCursor().getManagedLedger();
+        if (ledger == null || ledger.getLedgersInfo().isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        Long firstLedgerId = ledger.getLedgersInfo().firstKey();
+        if (null == firstLedgerId) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        Map<Range<Long>, ImmutableBucket> toBeDeletedBuckets = 
immutableBuckets.asMapOfRanges().entrySet().stream()
+                .filter(e -> e.getKey().upperEndpoint() < firstLedgerId)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        if (toBeDeletedBuckets.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        String ledgerName = ledger.getName();
+        CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
+        for (Map.Entry<Range<Long>, ImmutableBucket> entry : 
toBeDeletedBuckets.entrySet()) {
+            chain = chain.thenCompose(__ ->
+                    deleteBucketSnapshot(ledgerName, entry.getKey(), 
entry.getValue()));
+        }
+        return chain;
+    }
+
+    private CompletableFuture<Void> deleteBucketSnapshot(String ledgerName,
+                                                          Range<Long> range, 
ImmutableBucket bucket) {
+        synchronized (this) {
+            immutableBuckets.remove(range);
+            
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
+        }

Review Comment:
   Not clearing the data of `sharedBucketPriorityQueue` is intentionally 
designed, as `ManagedCursorImpl.asyncReplayEntries` will filter out invalid 
Positions.
   
   I did miss cleaning the `snapshotSegmentLastIndexMap`, this is a problem.
   
   Double decrease `numberDelayedMessages` is indeed a problem, good catch.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to