BewareMyPower opened a new pull request #10036:
URL: https://github.com/apache/pulsar/pull/10036


   ### Motivation
   
   In C++ client, zero queue consumer (i.e. the consumer's receiver queue size 
is 0) will still pre-fetch messages after `pauseMessageListener` is called. 
It's because `ConsumerImpl::increaseAvailablePermits` doesn't check the boolean 
variable `messageListenerRunning_`, which becomes false after 
`pauseMessageListener` is called. Therefore, after the zero queue consumer is 
paused, it will still send FLOW command to pre-fetch a message to its internal 
unbounded queue `incomingMessages_`.
   
   This behavior may cause some messages looks like being lost. For example, 
for a topic with 10 messages, start a shared consumer to consume 3 messages and 
pause. Then start another shared consumer to the same subscription. The new 
consumer will start from the 5th message because the 4th message is cached in 
the previous consumer.
   
   ### Modifications
   
   Here're the refactor changes:
   - Add a util function `addAndGet` to extend C++'s `std::atomic` with the 
Java like operation.
   - Change `messageListenerRunning_`'s type to `std::atomic_int` to avoid 
usage of mutex.
   - Change `receiveMessages` to `sendFlowPermitsToBroker`, which makes the 
method more readable and the checks for `ClientConnection` avoids lots of debug 
logs code outside the method invocation.
   
   Based on the refactors above, the key changes are:
   - Add the check for `messageListenerRunning_` in `increaseAvailablePermits` 
method, and make the implementation consistent with Java client's 
`ConsumerImpl#increaseAvailablePermits`. Also the type of `availablePermits_` 
is changed to `std::atomic_int`.
   - Since `pauseMessageListener` doesn't pre-fetch messages any more, add the 
`increaseAvailablePermits` invocation in `resumeMessageListener` to send FLOW 
command after consumer resumes.
   
   Finally the tests are added to verify the `AtomicHelper`'s util function and 
the expected behavior of zero queue consumer.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
     - Add `AtomicHelperTest` for extended operation on `std::atomic`, which is 
like Java's atomic `addAndGet`.
     - Add `ZeroQueueSizeTest.testPauseResume` for the case I've described in 
Motivation section before. Here we use `condition_variable::wait_for` method 
but not `Latch` because the condition may never meet in the previous wrong 
implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to