BewareMyPower commented on code in PR #1340:
URL: https://github.com/apache/pulsar-client-go/pull/1340#discussion_r1972902281
##########
pulsar/consumer_partition.go:
##########
@@ -997,7 +1016,15 @@ func (pc *partitionConsumer) requestSeek(msgID
*messageID) error {
if err := pc.requestSeekWithoutClear(msgID); err != nil {
return err
}
- pc.clearReceiverQueue()
+ // When the seek operation is successful, it indicates:
+ // 1. The broker has reset the cursor and sent a request to close the
consumer on the client side.
+ // Since this method is in the same goroutine as the
reconnectToBroker,
+ // we can safely clear the messages in the queue (at this point, it
won't contain messages after the seek).
+ // 2. The startMessageID is reset to ensure accurate judgment when
calling hasNext next time.
+ // Since the messages in the queue are cleared here reconnection
won't reset startMessageId.
+ pc.lastDequeuedMsg = nil
Review Comment:
I have a concern that if there is a race here. Before this change,
`lastDequeuedMsg` is only updated by `setLastDequeuedMsg`, which is only called
by the `Next` method called in user's goroutine. However, after that,
`lastDequeuedMsg` could be reset with nil in the event loop goroutine
(`runEventsLoop -> internalSeek -> requestSeek`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]