dao-jun opened a new pull request, #25984:
URL: https://github.com/apache/pulsar/pull/25984

   ### Motivation
   
     When a topic's managed ledger trims old ledgers (via retention or 
compaction), the immutable bucket snapshots in BucketDelayedDeliveryTracker 
that cover those deleted ledger ranges become orphaned — their underlying 
message data no longer exists, yet they:
   
     1. Continue to occupy storage as bucket snapshots.
     2. Count against the maxNumBuckets limit, potentially triggering 
unnecessary and expensive merge operations.
   
     Prior to this change, the tracker only merged buckets when the count 
exceeded the limit; it never cleaned up buckets whose ledger range had been 
fully trimmed. This change introduces a trim step that runs before merge, 
deleting orphaned buckets and decrementing the delayed-message counter 
accordingly.
   
   ### Modifications
   
     Source (BucketDelayedDeliveryTracker.java)
   
     - Added trimFuture: A volatile CompletableFuture<Void> that represents the 
currently in-flight trim/merge/clear operation. It acts as both a gate 
(preventing concurrent trim chains) and a synchronization point for clear().
     - Added asyncTrimImmutableBuckets(): Scans the managed ledger's surviving 
ledger range. Buckets whose upperEndpoint falls entirely before the earliest 
surviving ledger are selected for deletion — these are orphaned and safe to 
remove.
     - Added deleteBucketSnapshot(): Removes a single bucket from 
immutableBuckets, asynchronously deletes its snapshot, and re-adds the bucket 
on failure (rolling back the message count). Failures propagate via 
CompletionException to stop the sequential deletion chain, avoiding wasted 
attempts when storage is unavailable.
     - Changed the merge trigger in addMessage(): Before merging, the tracker 
now runs asyncTrimImmutableBuckets() first. The trimFuture gate ensures only 
one trim/merge chain is in-flight at a time.
     - Added early-return to asyncMergeBucketSnapshot(): If the bucket count is 
already within the limit, merge returns immediately without scanning for merge 
candidates.
     - Rewrote clear(): Instead of immediately clearing state (which could race 
with an in-flight trim's failure callback), clear() now chains itself after any 
pending trimFuture. It also sets trimFuture to the clear chain, blocking new 
trim triggers until the clear completes.
   
     Test (BucketDelayedDeliveryTrackerTest.java)
   
     Added 4 tests using a helper that injects a mocked ManagedLedger:
   
     | Test | Scenario |
     | -- | -- |
     | testTrimRemovesOrphanedBuckets | Buckets covering ledgers below 
firstLedgerId are removed, remaining ones have ranges ≥ firstLedgerId |
     | testTrimHandlesDeleteFailure | Injected delete exception is consumed, 
tracker remains operational |
     | testTrimWithNoOrphanedBuckets | With firstLedgerId=0, trim is a no-op, 
merge runs normally |
     | testMergeEarlyReturnWhenWithinLimit | With maxNumBuckets far above 
actual count, merge returns early without compaction |
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to