Denovo1998 commented on code in PR #25984:
URL: https://github.com/apache/pulsar/pull/25984#discussion_r3396557389


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -816,4 +840,58 @@ public Map<String, TopicMetricBean> genTopicMetricMap() {
         stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
         return stats.genTopicMetricMap();
     }
+
+    /**
+     * Delete orphaned bucket snapshots whose ledger range lies entirely 
before the earliest
+     * surviving ledger. Buckets are deleted sequentially; the chain stops on 
first failure
+     * to avoid wasted work when storage is unavailable.
+     */
+    private synchronized CompletableFuture<Void> asyncTrimImmutableBuckets() {
+        ManagedLedger ledger = context.getCursor().getManagedLedger();
+        if (ledger == null || ledger.getLedgersInfo().isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        Long firstLedgerId = ledger.getLedgersInfo().firstKey();
+        if (null == firstLedgerId) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        Map<Range<Long>, ImmutableBucket> toBeDeletedBuckets = 
immutableBuckets.asMapOfRanges().entrySet().stream()
+                .filter(e -> e.getKey().upperEndpoint() < firstLedgerId)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        if (toBeDeletedBuckets.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        String ledgerName = ledger.getName();
+        CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
+        for (Map.Entry<Range<Long>, ImmutableBucket> entry : 
toBeDeletedBuckets.entrySet()) {
+            chain = chain.thenCompose(__ ->
+                    deleteBucketSnapshot(ledgerName, entry.getKey(), 
entry.getValue()));
+        }
+        return chain;
+    }
+
+    private CompletableFuture<Void> deleteBucketSnapshot(String ledgerName,
+                                                          Range<Long> range, 
ImmutableBucket bucket) {
+        synchronized (this) {
+            immutableBuckets.remove(range);
+            
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
+        }

Review Comment:
   Removing a bucket from immutableBuckets might not be enough. When a mutable 
bucket is sealed, the first snapshot segment has already been loaded into 
sharedBucketPriorityQueue, and snapshotSegmentLastIndexMap can still point to 
this bucket. After this trim removes the range and decrements 
numberDelayedMessages, getScheduledMessages() can still pop those queued 
entries and return positions from ledgers that have already been deleted, then 
decrement numberDelayedMessages again.
   
   Could we either purge the in-memory state for the trimmed bucket under the 
same lock, such as rebuilding sharedBucketPriorityQueue and removing matching 
snapshotSegmentLastIndexMap entries, or otherwise avoid trimming buckets that 
still have loaded segments? Please also add a test that advances the clock 
after trim and asserts no positions below firstLedgerId are returned and the 
delayed-message counter remains consistent.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java:
##########
@@ -469,4 +471,137 @@ public void testClear(BucketDelayedDeliveryTracker 
tracker)
 
       tracker.close();
     }
+
+    private static class TrackerWithStorage {
+        final BucketDelayedDeliveryTracker tracker;
+        final MockBucketSnapshotStorage storage;
+
+        TrackerWithStorage(BucketDelayedDeliveryTracker tracker, 
MockBucketSnapshotStorage storage) {
+            this.tracker = tracker;
+            this.storage = storage;
+        }
+
+        void close() throws Exception {
+            tracker.close();
+            storage.close();
+        }
+    }
+
+    private TrackerWithStorage createTrackerWithMockLedger(long firstLedgerId, 
int maxNumBuckets)
+            throws Exception {
+        MockBucketSnapshotStorage storage = new MockBucketSnapshotStorage();
+        storage.start();
+
+        ManagedLedger mockLedger = mock(ManagedLedger.class);
+        NavigableMap<Long, LedgerInfo> ledgerInfo = new TreeMap<>();
+        ledgerInfo.put(firstLedgerId, mock(LedgerInfo.class));
+        when(mockLedger.getLedgersInfo()).thenReturn(ledgerInfo);
+        when(mockLedger.getName()).thenReturn("test-ledger");
+
+        ManagedCursor mockCursor = new MockManagedCursor("test-cursor") {
+            @Override
+            public ManagedLedger getManagedLedger() {
+                return mockLedger;
+            }
+        };
+
+        AbstractPersistentDispatcherMultipleConsumers disp =
+                mock(AbstractPersistentDispatcherMultipleConsumers.class);
+        Clock mockClock = mock(Clock.class);
+        AtomicLong mockClockTime = new AtomicLong();
+        when(mockClock.millis()).then(x -> mockClockTime.get());
+        doReturn(mockCursor).when(disp).getCursor();
+        doReturn("persistent://public/default/testDelay" + " / " + 
mockCursor.getName()).when(disp).getName();
+
+        BucketDelayedDeliveryTracker tracker = new 
BucketDelayedDeliveryTracker(disp, mock(Timer.class),
+                100000, mockClock, true, storage, 5, 
TimeUnit.MILLISECONDS.toMillis(10), -1, maxNumBuckets);
+        return new TrackerWithStorage(tracker, storage);
+    }
+
+    @Test
+    public void testTrimRemovesOrphanedBuckets() throws Exception {
+        TrackerWithStorage ts = createTrackerWithMockLedger(50L, 5);
+
+        for (int i = 1; i <= 31; i++) {
+            ts.tracker.addMessage(i, i, i * 10);
+        }
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
+                        .noneMatch(x -> x.merging)));
+
+        int bucketCount = 
ts.tracker.getImmutableBuckets().asMapOfRanges().size();
+        assertTrue(bucketCount <= 5,
+                "Bucket count " + bucketCount + " should be <= maxNumBuckets=5 
after trim+merge");
+
+        ts.tracker.getImmutableBuckets().asMapOfRanges().forEach((range, 
bucket) ->
+                assertTrue(range.lowerEndpoint() >= 50L,
+                        "Remaining bucket range " + range + " should be >= 
50"));
+

Review Comment:
   This test only checks that immutableBuckets no longer contains ranges below 
firstLedgerId. It does not verify the externally visible behavior after trim. 
Please advance the mock clock and call getScheduledMessages(), then assert that 
no position from ledgers below firstLedgerId is returned and that 
getNumberOfDelayedMessages() remains consistent. That would catch stale entries 
left in sharedBucketPriorityQueue after the bucket range is removed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -754,12 +769,21 @@ public boolean shouldPauseAllDeliveries() {
 
     @Override
     public synchronized CompletableFuture<Void> clear() {
-        CompletableFuture<Void> future = cleanImmutableBuckets();
-        sharedBucketPriorityQueue.clear();
-        lastMutableBucket.clear();
-        snapshotSegmentLastIndexMap.clear();
-        numberDelayedMessages.set(0);
-        return future;
+        // Wait for any in-flight trim+merge to settle, then clear.
+        // Reuse trimFuture to block new triggers until the clear chain 
completes.
+        CompletableFuture<Void> before = trimFuture != null && 
!trimFuture.isDone()
+                ? trimFuture : CompletableFuture.completedFuture(null);
+        trimFuture = before.thenCompose(__ -> {

Review Comment:
   If clear() is called while trimFuture is still in flight, and that 
trim/merge chain later completes exceptionally, this before.thenCompose(...) 
will not execute the clear block. The whenComplete in addMessage only logs the 
failure and preserves the exceptional completion, so clear() can return 
exceptionally while leaving immutableBuckets, sharedBucketPriorityQueue, 
lastMutableBucket, snapshotSegmentLastIndexMap, and numberDelayedMessages 
uncleared.
   
   Could we normalize the previous future before chaining clear, for example 
with handle/exceptionally, so clear always runs after the in-flight trim/merge 
settles?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to