BewareMyPower commented on code in PR #1340:
URL: https://github.com/apache/pulsar-client-go/pull/1340#discussion_r1972902281


##########
pulsar/consumer_partition.go:
##########
@@ -997,7 +1016,15 @@ func (pc *partitionConsumer) requestSeek(msgID 
*messageID) error {
        if err := pc.requestSeekWithoutClear(msgID); err != nil {
                return err
        }
-       pc.clearReceiverQueue()
+       // When the seek operation is successful, it indicates:
+       // 1. The broker has reset the cursor and sent a request to close the 
consumer on the client side.
+       //    Since this method is in the same goroutine as the 
reconnectToBroker,
+       //    we can safely clear the messages in the queue (at this point, it 
won't contain messages after the seek).
+       // 2. The startMessageID is reset to ensure accurate judgment when 
calling hasNext next time.
+       //    Since the messages in the queue are cleared here reconnection 
won't reset startMessageId.
+       pc.lastDequeuedMsg = nil

Review Comment:
   I have a concern that if there is a race here. Before this change, 
`lastDequeuedMsg` is only updated by `setLastDequeuedMsg`, which is only called 
by the `Next` method called in user's goroutine. However, after that, 
`lastDequeuedMsg` could be reset with nil in the event loop goroutine 
(`runEventsLoop -> internalSeek -> requestSeek`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to