Nicklee007 opened a new pull request, #15474:
URL: https://github.com/apache/pulsar/pull/15474

   Fixes #15473
   
   ### Motivation
   The pr [#10087](https://github.com/apache/pulsar/pull/10087) has fixed some 
case  expired data cannot cleanup, but in some other case the bug reappeared .
   
   #### case 1 :  the `slowestReaderPosition` be reset by other cursor operator 
before we can trim the expired ledger.
   The topic producer produced some data and stoped in a time, then no more 
message will be add. When the `maximumRolloverTimeMs` reached, check the 
current ledger is full , closed the old ledger and created new ledger,  after 
ledger create complete `maybeUpdateCursorBeforeTrimmingConsumedLedger()` will  
update cursor `slowestReaderPosition` to `(newLedgerId,-1)` and trigger 
`trimConsumedLedgersInBackground()` immediately
   
https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2284
   
https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2295-L2296
   
   But almost the message `retentionTimeMs`  is longer than  
`maximumRolloverTimeMs`, then the ledger can not be clean by 
`trimConsumedLedgersInBackground`  at this time running.
   And then if we have a flink task consumer the topic( use reader to consumer 
and mark position by another durable cursor), the client will always reset the 
durable cursor to the position` (olderLedgerId, entryNum +1)` 
    `internalResetCursor` invoke `internalAsyncMarkDelete` invoke 
`ledger.updateCursor` to reset `slowestReaderPosition` to ` (olderLedgerId, 
entryNum )`,  when reached the next `retentionCheckIntervalInSeconds` to invoke 
 `trimConsumedLedgersInBackground` ,but the  `slowestReaderPosition` has been 
reset to the ` (olderLedgerId, entryNum )`,  then  we can not clean the old 
ledger in any reach 'retentionCheckIntervalInSeconds' time.
   
https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2474
   
   Also the old ledger is not expired when after created ledger trigger 
`trimConsumedLedgersInBackground` ,another possibility cursor operator is the 
`markDeleteLimiter` cause the consumer last  mark-delete operations is Dirty 
and need mark delete by  `flushCursorsTask` also will invoke` 
ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition) ` to 
`(olderLedgerId, entryNum )` . After this time we also cloud not clean the old 
ledger by `trimConsumedLedgersInBackground`.
   
https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L3132-L3138
   
   #### case2:  have not a condition to set slowestReaderPosition to the 
current position.
   In this case, the most common is the producer stoped in one time and we have 
durable cursor, but we have not active consumer, the cursor position move 
forward by `MessageExpiryMonitor`,maybe the cursor mark delete position is 
before a few ledger , and the last time create ledger and invoke 
`maybeUpdateCursorBeforeTrimmingConsumedLedger` maybe not set 
`slowestReaderPosition` to the newest ledger `(newLedgerId,-1)` , after then 
`MessageExpiryMonitor` only can reset `slowestReaderPosition` 
to`(olderLedgerId, entryNum )` . After this time we also cloud not clean the 
old ledger by `trimConsumedLedgersInBackground`.
   
   
   ### Modifications
   
   1.  when invoke ` checkConsumedLedgers` , we use 
`maybeUpdateCursorBeforeTrimmingConsumedLedger()` to check if ledger consumed 
completely and reset cursor to next ledger, then the `ConsumedLedgersMonitor` 
can trim the old ledger data.
   2. PR [#14672](https://github.com/apache/pulsar/pull/14672) change 
`rollCurrentLedgerIfFull` logic to follow lazy creation of ledger, we need 
create new ledger to reset `slowestReaderPosition` to the new empty ledger. So 
we need  add invoke `createLedgerAfterClosed()` in `rollCurrentLedgerIfFull`.
   3.  add/modify some unit test.
   
   
   
   ### Documentation
   
   Check the box below or label this PR directly.
   Need to update docs? 
   - [X] `no-need-doc` 
   (Please explain why)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to