Nicklee007 opened a new pull request, #15474: URL: https://github.com/apache/pulsar/pull/15474
Fixes #15473 ### Motivation The pr [#10087](https://github.com/apache/pulsar/pull/10087) has fixed some case expired data cannot cleanup, but in some other case the bug reappeared . #### case 1 : the `slowestReaderPosition` be reset by other cursor operator before we can trim the expired ledger. The topic producer produced some data and stoped in a time, then no more message will be add. When the `maximumRolloverTimeMs` reached, check the current ledger is full , closed the old ledger and created new ledger, after ledger create complete `maybeUpdateCursorBeforeTrimmingConsumedLedger()` will update cursor `slowestReaderPosition` to `(newLedgerId,-1)` and trigger `trimConsumedLedgersInBackground()` immediately https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2284 https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2295-L2296 But almost the message `retentionTimeMs` is longer than `maximumRolloverTimeMs`, then the ledger can not be clean by `trimConsumedLedgersInBackground` at this time running. And then if we have a flink task consumer the topic( use reader to consumer and mark position by another durable cursor), the client will always reset the durable cursor to the position` (olderLedgerId, entryNum +1)` `internalResetCursor` invoke `internalAsyncMarkDelete` invoke `ledger.updateCursor` to reset `slowestReaderPosition` to ` (olderLedgerId, entryNum )`, when reached the next `retentionCheckIntervalInSeconds` to invoke `trimConsumedLedgersInBackground` ,but the `slowestReaderPosition` has been reset to the ` (olderLedgerId, entryNum )`, then we can not clean the old ledger in any reach 'retentionCheckIntervalInSeconds' time. https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2474 Also the old ledger is not expired when after created ledger trigger `trimConsumedLedgersInBackground` ,another possibility cursor operator is the `markDeleteLimiter` cause the consumer last mark-delete operations is Dirty and need mark delete by `flushCursorsTask` also will invoke` ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition) ` to `(olderLedgerId, entryNum )` . After this time we also cloud not clean the old ledger by `trimConsumedLedgersInBackground`. https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L3132-L3138 #### case2: have not a condition to set slowestReaderPosition to the current position. In this case, the most common is the producer stoped in one time and we have durable cursor, but we have not active consumer, the cursor position move forward by `MessageExpiryMonitor`,maybe the cursor mark delete position is before a few ledger , and the last time create ledger and invoke `maybeUpdateCursorBeforeTrimmingConsumedLedger` maybe not set `slowestReaderPosition` to the newest ledger `(newLedgerId,-1)` , after then `MessageExpiryMonitor` only can reset `slowestReaderPosition` to`(olderLedgerId, entryNum )` . After this time we also cloud not clean the old ledger by `trimConsumedLedgersInBackground`. ### Modifications 1. when invoke ` checkConsumedLedgers` , we use `maybeUpdateCursorBeforeTrimmingConsumedLedger()` to check if ledger consumed completely and reset cursor to next ledger, then the `ConsumedLedgersMonitor` can trim the old ledger data. 2. PR [#14672](https://github.com/apache/pulsar/pull/14672) change `rollCurrentLedgerIfFull` logic to follow lazy creation of ledger, we need create new ledger to reset `slowestReaderPosition` to the new empty ledger. So we need add invoke `createLedgerAfterClosed()` in `rollCurrentLedgerIfFull`. 3. add/modify some unit test. ### Documentation Check the box below or label this PR directly. Need to update docs? - [X] `no-need-doc` (Please explain why) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
