summeriiii opened a new pull request, #23305:
URL: https://github.com/apache/pulsar/pull/23305

   Fixes https://github.com/apache/pulsar/issues/23239
   
   ### Motivation
   #### Briefly describe the issue: 
   Consumer consumed all messages and then the msgBacklog is 0, but after topic 
unload, the msgBacklog turned to be 1 instead of 0 which is not what we 
expected. The detail is in test 
NonDurableSubscriptionTest#testNonDurableSubscriptionBackLogAfterTopicUnload.
       
   #### Root cause:
   I analysed the related code, and found this issue was introduced by 
https://github.com/apache/pulsar/pull/4331. In the 
https://github.com/apache/pulsar/pull/4331, 
   `if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0) {`  was deleted 
directly, which cause none batch message will also execute the step `entryId = 
msgId.getEntryId() - 1;`.  Because of this, the entryId reduced by 1, then the 
msgBacklog turned to be 1 instead of 0.
    
   In the https://github.com/apache/pulsar/pull/4331, we added 
`ReaderBuilder#startMessageIdInclusive` interface to allow create Reader which 
read containes startMessageId, but it has not been implemented correctly. We 
should add the field **resetIncludeHead** in CommandSubscribe to implement it.
     
   ### Modifications
   - CommandSubscribe add the field **resetIncludeHead**, and when use the 
ReaderBuilder#startMessageIdInclusive, this param is true while other is false.
   - Persist#getNonDurableSubscription method add the judge condition 
`(msgId.getBatchIndex() >= 0 || resetIncludeHead)`, entryId -1 will execute 
Only **when msg is batch or the resetIncludeHead is true.**
       
   ```java
                  if (ledgerId >= 0 && entryId >= 0
                           && msgId instanceof BatchMessageIdImpl
                           && (msgId.getBatchIndex() >= 0 || resetIncludeHead)) 
{
                       // When the start message is relative to a batch, we 
need to take one step back on the previous
                       // message,
                       // because the "batch" might not have been consumed in 
its entirety.
                       // The client will then be able to discard the first 
messages if needed.
                       entryId = msgId.getEntryId() - 1;
                   }
   ```  
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to