Vanlightly opened a new pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691
Fixes #11689
### Motivation
When `Consumer.batchReceive()` is called concurrently by different threads
there exists a race condition in `ConsumerBase.java` which when triggered
causes a CompletableFuture in the queue `pendingBatchReceives` to be removed
from the queue but not completed, causing the consumer to block forever. This
has occurred a few times in production recently.
The issue is that there are concurrent calls to peek and poll in
`peekNextBatchReceive` and the code is only correct when what is peeked is
polled. If another thread calls poll between a peek and poll then this bug
occurs. There is an error message when this occurs Bug: Removed entry wasn't
the expected one.
### Modifications
- Added the protection of a lock where a peek and poll of
`pendingBatchReceives` are required together in ConsumerBase.
- Eagerly instantiate the `pendingBatchReceives` in `ConsumerImpl` and
`MultiTopicsConsumerImpl` given that the lazy instantiation was not thread safe.
- Replaced the use of peek for checking if a pending queue is empty or not.
- Changed the peeking and polling method names to an iterator style of has
and next.
- The house keeping of clearing any completed futures from the queue is
handled only by `nextBatchReceive` now.
- Name changes caused some minor changes in some tests.
- Some moving of imports due to checkstyle errors
- Removed arguments from `failPendingBatchReceives` and
`failPendingReceives` as they were superfluos.
### Verifying this change
All existing tests that cover the consumer pass.
Reproduction was difficult, but I have seen it in production. The only way
to make it trigger within a few seconds or minutes was to add a short thread
sleep between the peek and poll while calling batchReceive concurrently from
many threads with a non-zero `maxNumMessages`. I have run the repro steps in
the issue with this fix and added a temporary thread sleep between peek and
poll, and have verified it no longer occurs.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
#### For contributor
Nothing to document here, purely internal implementation details.
#### For committer
For this PR, do we need to update docs?
- If yes,
- if you update docs in this PR, label this PR with the `doc` label.
- if you plan to update docs later, label this PR with the `doc-required`
label.
- if you need help on updating docs, create a follow-up issue with the
`doc-required` label.
- If no, label this PR with the `no-need-doc` label and explain why.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]