zhanglistar opened a new pull request, #549:
URL: https://github.com/apache/pulsar-client-cpp/pull/549
<!--
### Contribution Checklist
- PR title format should be *[type][component] summary*. For details, see
*[Guideline - Pulsar PR Naming
Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*.
- Fill out the template below to describe the changes contributed by the
pull request. That will give reviewers the context they need to do the review.
- Each pull request should address only one issue, not mix up code from
multiple issues.
- Each commit in the pull request has a meaningful commit message
- Once all items of the checklist are addressed, remove the above text and
this checklist, leaving only the filled out template below.
-->
Fixes #<xyz>
### Motivation
When a reader seeks to `MessageId::latest()`, the broker may close the
consumer and trigger a reconnect. In `seekAsyncInternal`, when the seek
succeeds but the connection is already expired or `reconnectionPending_` is
true, we only set `seekStatus_ = COMPLETED` and did not clear the local receive
queue or update `startMessageId_`. As a result, `incomingMessages_` still held
prefetched messages from before the seek until `connectionOpened()` later
called `clearReceiveQueue()`. During that window, `hasMessageAvailable()` saw
`!incomingMessages_.empty()` and returned true, causing the flaky test
`ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd`.
### Modifications
- In `ConsumerImpl::seekAsyncInternal`, when seek result is OK and
`getCnx().expired() || reconnectionPending_` (reconnection path), we now
perform the same local state updates as the non-reconnection path:
- `ackGroupingTrackerPtr_->flushAndClean()`
- `incomingMessages_.clear()`
- If `lastSeekArg_` holds a `MessageId`, set `startMessageId_` from it
So after the seek is acknowledged, stale prefetched messages are cleared
immediately and `hasMessageAvailable()` no longer returns true from old queue
content before reconnect completes.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is already covered by existing tests:
- `ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd` (parameterized) –
asserts that after seek to latest, `hasMessageAvailable()` becomes false, then
after producing a new message it becomes true and the message can be read; the
fix removes the flakiness when the broker triggers reconnect on seek-to-end.
### Documentation
- [x] `doc-not-needed`
Bug fix in C++ client implementation only; no API or behavior contract
change and no user-facing docs update required.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]