poorbarcode opened a new pull request, #20737:
URL: https://github.com/apache/pulsar/pull/20737
### Motivation
**Background of consumer reconnects**
- grab connection:
- lookup the broker which owned the topic
- get the existing connection by the broker; create one if it does not
exist
- clear the messages in memory
- send `CMD-subscribe` to the broker
- send `flow permits` to broker to increment `availablePermits`
**Background messages lost**
| time | `subscribe 1st` | `subscribe 2nd` |
| --- | --- | --- |
| 1 | subscribe start |
| 2 | Clear messages in memory |
| 3 | subscribe success |
| 4 | receive 712 messages |
| 5 | | subscribe start |
| 6 | Clear messages in memory.<strong>(Highlight)</strong> 712 messages
were lost |
| 7 | subscribe success |
---
**(Highlight)Issue: the same consumer subscribed twice**
| time | `subscribe 1st and 2nd` | `unload topic` | `reconnect due to unload
topic` | `reconnect due to subscribe 2nd timeout` |
| --- | --- | --- | --- | --- |
| 1 | subscribe 1st success |
| 2 | | unload the topic |
| 3 | subscribe 2nd started due to `unload topic` |
| 4 | the consumer registered on the subscription, and the future of
creating consumer is not complete yet |
| 5 | | unload the topic |
| 6 | | | subscribe 3rd started due to `unload topic` | subscribe 4th
started due to `subscribe timeout` |
| 7 | | | compare and set `consumer.cnx` to null | set `consumer.cnx` to
null |
| 8 | | | | ask broker remove the previous consumer |
| 9 | | | subscribe success |
| 10 | | | | subscribe success 2nd |
The above process can be reproduced using the test
`testCnxInactiveWhenDoingSubscribe` in the PR
https://github.com/apache/pulsar/pull/20735.
Note: The above process only simulates the simultaneous execution of
`subscribe` and `unload topic`, and the scenario will be more complicated if
the event `channel in-active` are added because the event `channel in-active`
will trigger other events like this:
- another `reconnect`
- more time-out probability because the response of `cmd-subscribe` can not
send out
- make the request `ask broker remove the previous consumer` will be
invalidated because the channel can not send data out.
### Modifications
Always ask the broker to remove the previous same consumer when reconnecting
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: x
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]