BewareMyPower opened a new pull request #10036:
URL: https://github.com/apache/pulsar/pull/10036
### Motivation
In C++ client, zero queue consumer (i.e. the consumer's receiver queue size
is 0) will still pre-fetch messages after `pauseMessageListener` is called.
It's because `ConsumerImpl::increaseAvailablePermits` doesn't check the boolean
variable `messageListenerRunning_`, which becomes false after
`pauseMessageListener` is called. Therefore, after the zero queue consumer is
paused, it will still send FLOW command to pre-fetch a message to its internal
unbounded queue `incomingMessages_`.
This behavior may cause some messages looks like being lost. For example,
for a topic with 10 messages, start a shared consumer to consume 3 messages and
pause. Then start another shared consumer to the same subscription. The new
consumer will start from the 5th message because the 4th message is cached in
the previous consumer.
### Modifications
Here're the refactor changes:
- Add a util function `addAndGet` to extend C++'s `std::atomic` with the
Java like operation.
- Change `messageListenerRunning_`'s type to `std::atomic_int` to avoid
usage of mutex.
- Change `receiveMessages` to `sendFlowPermitsToBroker`, which makes the
method more readable and the checks for `ClientConnection` avoids lots of debug
logs code outside the method invocation.
Based on the refactors above, the key changes are:
- Add the check for `messageListenerRunning_` in `increaseAvailablePermits`
method, and make the implementation consistent with Java client's
`ConsumerImpl#increaseAvailablePermits`. Also the type of `availablePermits_`
is changed to `std::atomic_int`.
- Since `pauseMessageListener` doesn't pre-fetch messages any more, add the
`increaseAvailablePermits` invocation in `resumeMessageListener` to send FLOW
command after consumer resumes.
Finally the tests are added to verify the `AtomicHelper`'s util function and
the expected behavior of zero queue consumer.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- Add `AtomicHelperTest` for extended operation on `std::atomic`, which is
like Java's atomic `addAndGet`.
- Add `ZeroQueueSizeTest.testPauseResume` for the case I've described in
Motivation section before. Here we use `condition_variable::wait_for` method
but not `Latch` because the condition may never meet in the previous wrong
implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]