shibd opened a new pull request, #1340:
URL: https://github.com/apache/pulsar-client-go/pull/1340

   ### BackKnowage
   
   There are **three attributes** to handle seek/filter/subscribe for Reader.
   
   1. `startMessageID`: The start message ID, which can be specified by the 
user when creating the reader. It can be set to a specific position, or to the 
`earliest` or `latest`. When creating a consumer, this is [set in the 
request](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/consumer_partition.go#L1979),
 and the broker uses it to set mark delete position.
   2. `lastDequeuedMsg`: The most recent message dispatched from the consumer's 
internal queue to the user. **If it's nil, it means no messages have been 
delivered to the user. If there is a value, it indicates the user has consumed 
up to this point.**
   3. `lastMessageInBroker`: The ID of the last message in the topic on the 
broker.
   
   #### Question-1: How to determine hasNext?
   
   [Refer to this 
code](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/consumer_partition.go#L2224)
   
   In simple terms, to minimize interaction with the broker:
   
   1. If `lastDequeuedMsg` has a value, it clearly indicates consumption up to 
this position. Compare this `lastDequeuedMsg with lastMessageInBroker`.
   2. If there is no value, compare `startMessageID with lastMessageInBroker`.
   
   The final comparison result, **if true**, indicates there really are 
messages. **If false**, it doesn’t necessarily mean there are no messages, 
because `lastMessageInBroker` is a cache, and the topic may have more messages 
already written. Therefore, **lastMessageInBroker is updated from the broker 
again for reevaluation**
   
   ### Question-2: What happens when the consumer reconnects?
   
   When the consumer reconnects, it clears all messages in the `receive queue` 
and **updates startMessageId with the first message in the queue**. This 
indicates that the user has been consumed up to this position.
   
   
   
   #### !!! The above logic works well without a seek operation, but once the 
user calls a seek operation, the situation becomes special.
   
   #### Question-3: How does the seek operation work?
   
   **For the broker:**
   When the client sends a seek request to the broker, the broker will do the 
following:
   1. Rewind the cursor to the seek position (ID or time).
   2. Send a close request to the consumer.
   3. Send a respond that the seek operation is complete.
   
   **For the client:**
   1. Send a seek request. Wait for the broker to respond that the seek is 
complete
   2. Set `lastDequeuedMsg` to null because after seeking, this value will be 
invalid. `**(Tip: Currently missing, this PR fixes it.)**`
   3. Clear the queue to avoid resetting `startMessageId` to the first position 
of the queue upon reconnection. `**(Tip: Currently missing, this PR fixes 
it.)**`
   4. If seekByID:
       - If the passed ID is a specific position or earliest, set 
startMessageId to seek Id. `**(Tip: Currently missing, this PR fixes it.)**`
       - If the passed ID is latest, obtain the topic `lastMessageId` and set 
startMessageId to it. `**(Tip: Currently missing, a separate PR will fix this 
later as it [currently doesn’t support directly passing latest 
id](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/reader_impl.go#L210-L214).)**`
   5. If seekByTime, the client doesn’t know the messageId corresponding to the 
time during the seek, it needs to call getLastMessageId and set it to 
startMessageId. `**(Tip: Currently missing, this PR fixes it.)**`
   6. Seek complete, then return to the user.
   7. BTW: Actually, this is just the completion of the seek operation. The 
real completion requires waiting for the consumer to successfully reconnect to 
the broker, but this happens in the background and the user doesn't need to be 
concerned about it. Also, **note that we don't need to wait for the consumer to 
reconnect to the broker before returning the seek completion to the client**, 
because when the broker responds that the seek is complete, **it has already 
reset the cursor.** After the client receives the response, it has cleared the 
queue, so no previous messages will come in. In extremely rare cases (**where 
data from the old connection's buffer re-enters the queue after clearing**), 
[we will filter these messages using startMessageId to filter message 
ID](https://github.com/apache/pulsar-client-go/blob/9aeeb4e7bbfcaa4039d66ab2857888be2d6fcaf3/pulsar/consumer_partition.go#L1475-L1486).
   
   
   ### Modifications
   
   You can run tests `TestReaderWithSeekByID` and `TestReaderWithSeekByTime` to 
understand the user-side issues fixed by this PR. Simply put, after seeking, 
calling `hasNext` again should meet expectations.
   
   
   
   ### Verifying this change
   1. Add TestReaderWithSeekByID and TestReaderWithSeekByTime to cover this 
change.
   2. I did not change any other test, other tests passed mean not introducing 
any break change.
   
   
   ### Does this pull request potentially affect one of the following parts:
   - No
   
   ### Documentation
   - No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to