shibd opened a new pull request, #1340: URL: https://github.com/apache/pulsar-client-go/pull/1340
### BackKnowage There are **three attributes** to handle seek/filter/subscribe for Reader. 1. `startMessageID`: The start message ID, which can be specified by the user when creating the reader. It can be set to a specific position, or to the `earliest` or `latest`. When creating a consumer, this is [set in the request](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/consumer_partition.go#L1979), and the broker uses it to set mark delete position. 2. `lastDequeuedMsg`: The most recent message dispatched from the consumer's internal queue to the user. **If it's nil, it means no messages have been delivered to the user. If there is a value, it indicates the user has consumed up to this point.** 3. `lastMessageInBroker`: The ID of the last message in the topic on the broker. #### Question-1: How to determine hasNext? [Refer to this code](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/consumer_partition.go#L2224) In simple terms, to minimize interaction with the broker: 1. If `lastDequeuedMsg` has a value, it clearly indicates consumption up to this position. Compare this `lastDequeuedMsg with lastMessageInBroker`. 2. If there is no value, compare `startMessageID with lastMessageInBroker`. The final comparison result, **if true**, indicates there really are messages. **If false**, it doesn’t necessarily mean there are no messages, because `lastMessageInBroker` is a cache, and the topic may have more messages already written. Therefore, **lastMessageInBroker is updated from the broker again for reevaluation** ### Question-2: What happens when the consumer reconnects? When the consumer reconnects, it clears all messages in the `receive queue` and **updates startMessageId with the first message in the queue**. This indicates that the user has been consumed up to this position. #### !!! The above logic works well without a seek operation, but once the user calls a seek operation, the situation becomes special. #### Question-3: How does the seek operation work? **For the broker:** When the client sends a seek request to the broker, the broker will do the following: 1. Rewind the cursor to the seek position (ID or time). 2. Send a close request to the consumer. 3. Send a respond that the seek operation is complete. **For the client:** 1. Send a seek request. Wait for the broker to respond that the seek is complete 2. Set `lastDequeuedMsg` to null because after seeking, this value will be invalid. `**(Tip: Currently missing, this PR fixes it.)**` 3. Clear the queue to avoid resetting `startMessageId` to the first position of the queue upon reconnection. `**(Tip: Currently missing, this PR fixes it.)**` 4. If seekByID: - If the passed ID is a specific position or earliest, set startMessageId to seek Id. `**(Tip: Currently missing, this PR fixes it.)**` - If the passed ID is latest, obtain the topic `lastMessageId` and set startMessageId to it. `**(Tip: Currently missing, a separate PR will fix this later as it [currently doesn’t support directly passing latest id](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/reader_impl.go#L210-L214).)**` 5. If seekByTime, the client doesn’t know the messageId corresponding to the time during the seek, it needs to call getLastMessageId and set it to startMessageId. `**(Tip: Currently missing, this PR fixes it.)**` 6. Seek complete, then return to the user. 7. BTW: Actually, this is just the completion of the seek operation. The real completion requires waiting for the consumer to successfully reconnect to the broker, but this happens in the background and the user doesn't need to be concerned about it. Also, **note that we don't need to wait for the consumer to reconnect to the broker before returning the seek completion to the client**, because when the broker responds that the seek is complete, **it has already reset the cursor.** After the client receives the response, it has cleared the queue, so no previous messages will come in. In extremely rare cases (**where data from the old connection's buffer re-enters the queue after clearing**), [we will filter these messages using startMessageId to filter message ID](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/consumer_partition.go#L1475-L1486). ### Modifications You can run tests `TestReaderWithSeekByID` and `TestReaderWithSeekByTime` to understand the user-side issues fixed by this PR. Simply put, after seeking, calling `hasNext` again should meet expectations. ### Verifying this change 1. Add TestReaderWithSeekByID and TestReaderWithSeekByTime to cover this change. 2. I did not change any other test, other tests passed mean not introducing any break change. ### Does this pull request potentially affect one of the following parts: - No ### Documentation - No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
