315157973 opened a new pull request #10190: URL: https://github.com/apache/pulsar/pull/10190
Fixes #10152 ### Motivation ##### 1 When the connection is reset, message received by reader may be duplicated, so I remove the judgment of whether the key is unique Repeated scene: start to read -> 1255:0 threadId:68 after read,update read position -> 1255:1 threadId:68 start to read -> 1255:1 threadId:68 [pulsar-external-listener-39-1] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2] [d975f7e2-7b8f-467a-a56b-b3c1e96ec75f] Could not get connection while getLastMessageId -- Will try again in 190 ms [pulsar-external-listener-39-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2][d975f7e2-7b8f-467a-a56b-b3c1e96ec75f] Get topic last message Id [pulsar-client-io-38-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/ my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2][d975f7e2-7b8f-467a-a56b-b3c1e96ec75f] Successfully getLastMessageId 1255:0 [pulsar-client-io-38-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2][d975f7e2-7b8f-467a-a56b-b3c1e96ec75f] **Seek subscription to the message 1255:0:2:4** ##### 2 When I fixed bug 1, I found bug 2 normal situation: 1) ConsumerImpl.hasMessageAvailableAsync triggers seek, and then seek triggers reconnection 2) Reconnection triggers connectionOpened, at this time startMessageId = clearReceiverQueue() = seekMessageId 3) ConsumerImpl reconnects. BrokerlastDequeuedMessageId=earliest, startMessageId=seekMessageId 4) hasMessageAvailableAsync return true 5) The broker returns a message 6) Consumer receive abnormal situation: 1) ConsumerImpl.hasMessageAvailableAsync triggers seek, and then seek triggers reconnection 2) Reconnection triggers connectionOpened, at this time startMessageId = clearReceiverQueue() = seekMessageId 3) ConsumerImpl reconnects. BrokerlastDequeuedMessageId=earliest, startMessageId=seekMessageId 4) hasMessageAvailableAsync return true 5) The broker returns a message 6) Due to other reasons, the connection is disconnected again, and step 2, 3, and 4 are triggered again. When 2 is triggered, the message in incomeQueue is cleaned, and startMessageId is no longer equal to latest, so seek will not be triggered anymore, resulting in the cleaned message cannot be read The only time to trigger seek now is: lastDequeuedMessageId == MessageId.earliest and startMessageId.equals(MessageId.latest) and resetIncludeHead == true ### Modifications 1)remove ·`assertTrue(set.remove(key))` in uniteTest, because message may be duplicated 2)add seek in hasMessageAvailableAsync -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
