shibd commented on code in PR #1340:
URL: https://github.com/apache/pulsar-client-go/pull/1340#discussion_r1972993416


##########
pulsar/consumer_partition.go:
##########
@@ -997,7 +1016,15 @@ func (pc *partitionConsumer) requestSeek(msgID 
*messageID) error {
        if err := pc.requestSeekWithoutClear(msgID); err != nil {
                return err
        }
-       pc.clearReceiverQueue()
+       // When the seek operation is successful, it indicates:
+       // 1. The broker has reset the cursor and sent a request to close the 
consumer on the client side.
+       //    Since this method is in the same goroutine as the 
reconnectToBroker,
+       //    we can safely clear the messages in the queue (at this point, it 
won't contain messages after the seek).
+       // 2. The startMessageID is reset to ensure accurate judgment when 
calling hasNext next time.
+       //    Since the messages in the queue are cleared here reconnection 
won't reset startMessageId.
+       pc.lastDequeuedMsg = nil

Review Comment:
   When executing seek, will clear `messageChan` first and then 
`pauseDispatchMessage` message until the seek operator is complete(reconnect 
success)
   
   Refer to this PR:
   
   https://github.com/apache/pulsar-client-go/pull/1265



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to