Denovo1998 commented on code in PR #25984:
URL: https://github.com/apache/pulsar/pull/25984#discussion_r3396557389
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -816,4 +840,58 @@ public Map<String, TopicMetricBean> genTopicMetricMap() {
stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
return stats.genTopicMetricMap();
}
+
+ /**
+ * Delete orphaned bucket snapshots whose ledger range lies entirely
before the earliest
+ * surviving ledger. Buckets are deleted sequentially; the chain stops on
first failure
+ * to avoid wasted work when storage is unavailable.
+ */
+ private synchronized CompletableFuture<Void> asyncTrimImmutableBuckets() {
+ ManagedLedger ledger = context.getCursor().getManagedLedger();
+ if (ledger == null || ledger.getLedgersInfo().isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ Long firstLedgerId = ledger.getLedgersInfo().firstKey();
+ if (null == firstLedgerId) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ Map<Range<Long>, ImmutableBucket> toBeDeletedBuckets =
immutableBuckets.asMapOfRanges().entrySet().stream()
+ .filter(e -> e.getKey().upperEndpoint() < firstLedgerId)
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ if (toBeDeletedBuckets.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ String ledgerName = ledger.getName();
+ CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
+ for (Map.Entry<Range<Long>, ImmutableBucket> entry :
toBeDeletedBuckets.entrySet()) {
+ chain = chain.thenCompose(__ ->
+ deleteBucketSnapshot(ledgerName, entry.getKey(),
entry.getValue()));
+ }
+ return chain;
+ }
+
+ private CompletableFuture<Void> deleteBucketSnapshot(String ledgerName,
+ Range<Long> range,
ImmutableBucket bucket) {
+ synchronized (this) {
+ immutableBuckets.remove(range);
+
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
+ }
Review Comment:
Removing a bucket from immutableBuckets might not be enough. When a mutable
bucket is sealed, the first snapshot segment has already been loaded into
sharedBucketPriorityQueue, and snapshotSegmentLastIndexMap can still point to
this bucket. After this trim removes the range and decrements
numberDelayedMessages, getScheduledMessages() can still pop those queued
entries and return positions from ledgers that have already been deleted, then
decrement numberDelayedMessages again.
Could we either purge the in-memory state for the trimmed bucket under the
same lock, such as rebuilding sharedBucketPriorityQueue and removing matching
snapshotSegmentLastIndexMap entries, or otherwise avoid trimming buckets that
still have loaded segments? Please also add a test that advances the clock
after trim and asserts no positions below firstLedgerId are returned and the
delayed-message counter remains consistent.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java:
##########
@@ -469,4 +471,137 @@ public void testClear(BucketDelayedDeliveryTracker
tracker)
tracker.close();
}
+
+ private static class TrackerWithStorage {
+ final BucketDelayedDeliveryTracker tracker;
+ final MockBucketSnapshotStorage storage;
+
+ TrackerWithStorage(BucketDelayedDeliveryTracker tracker,
MockBucketSnapshotStorage storage) {
+ this.tracker = tracker;
+ this.storage = storage;
+ }
+
+ void close() throws Exception {
+ tracker.close();
+ storage.close();
+ }
+ }
+
+ private TrackerWithStorage createTrackerWithMockLedger(long firstLedgerId,
int maxNumBuckets)
+ throws Exception {
+ MockBucketSnapshotStorage storage = new MockBucketSnapshotStorage();
+ storage.start();
+
+ ManagedLedger mockLedger = mock(ManagedLedger.class);
+ NavigableMap<Long, LedgerInfo> ledgerInfo = new TreeMap<>();
+ ledgerInfo.put(firstLedgerId, mock(LedgerInfo.class));
+ when(mockLedger.getLedgersInfo()).thenReturn(ledgerInfo);
+ when(mockLedger.getName()).thenReturn("test-ledger");
+
+ ManagedCursor mockCursor = new MockManagedCursor("test-cursor") {
+ @Override
+ public ManagedLedger getManagedLedger() {
+ return mockLedger;
+ }
+ };
+
+ AbstractPersistentDispatcherMultipleConsumers disp =
+ mock(AbstractPersistentDispatcherMultipleConsumers.class);
+ Clock mockClock = mock(Clock.class);
+ AtomicLong mockClockTime = new AtomicLong();
+ when(mockClock.millis()).then(x -> mockClockTime.get());
+ doReturn(mockCursor).when(disp).getCursor();
+ doReturn("persistent://public/default/testDelay" + " / " +
mockCursor.getName()).when(disp).getName();
+
+ BucketDelayedDeliveryTracker tracker = new
BucketDelayedDeliveryTracker(disp, mock(Timer.class),
+ 100000, mockClock, true, storage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, maxNumBuckets);
+ return new TrackerWithStorage(tracker, storage);
+ }
+
+ @Test
+ public void testTrimRemovesOrphanedBuckets() throws Exception {
+ TrackerWithStorage ts = createTrackerWithMockLedger(50L, 5);
+
+ for (int i = 1; i <= 31; i++) {
+ ts.tracker.addMessage(i, i, i * 10);
+ }
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertTrue(ts.tracker.getImmutableBuckets().asMapOfRanges().values().stream()
+ .noneMatch(x -> x.merging)));
+
+ int bucketCount =
ts.tracker.getImmutableBuckets().asMapOfRanges().size();
+ assertTrue(bucketCount <= 5,
+ "Bucket count " + bucketCount + " should be <= maxNumBuckets=5
after trim+merge");
+
+ ts.tracker.getImmutableBuckets().asMapOfRanges().forEach((range,
bucket) ->
+ assertTrue(range.lowerEndpoint() >= 50L,
+ "Remaining bucket range " + range + " should be >=
50"));
+
Review Comment:
This test only checks that immutableBuckets no longer contains ranges below
firstLedgerId. It does not verify the externally visible behavior after trim.
Please advance the mock clock and call getScheduledMessages(), then assert that
no position from ledgers below firstLedgerId is returned and that
getNumberOfDelayedMessages() remains consistent. That would catch stale entries
left in sharedBucketPriorityQueue after the bucket range is removed.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -754,12 +769,21 @@ public boolean shouldPauseAllDeliveries() {
@Override
public synchronized CompletableFuture<Void> clear() {
- CompletableFuture<Void> future = cleanImmutableBuckets();
- sharedBucketPriorityQueue.clear();
- lastMutableBucket.clear();
- snapshotSegmentLastIndexMap.clear();
- numberDelayedMessages.set(0);
- return future;
+ // Wait for any in-flight trim+merge to settle, then clear.
+ // Reuse trimFuture to block new triggers until the clear chain
completes.
+ CompletableFuture<Void> before = trimFuture != null &&
!trimFuture.isDone()
+ ? trimFuture : CompletableFuture.completedFuture(null);
+ trimFuture = before.thenCompose(__ -> {
Review Comment:
If clear() is called while trimFuture is still in flight, and that
trim/merge chain later completes exceptionally, this before.thenCompose(...)
will not execute the clear block. The whenComplete in addMessage only logs the
failure and preserves the exceptional completion, so clear() can return
exceptionally while leaving immutableBuckets, sharedBucketPriorityQueue,
lastMutableBucket, snapshotSegmentLastIndexMap, and numberDelayedMessages
uncleared.
Could we normalize the previous future before chaining clear, for example
with handle/exceptionally, so clear always runs after the in-flight trim/merge
settles?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]