315157973 opened a new pull request #10190:
URL: https://github.com/apache/pulsar/pull/10190


   Fixes #10152
   
   ### Motivation
   
   ##### 1
   When the connection is reset, message received by reader may be duplicated, 
so I remove the judgment of whether the key is unique
   Repeated scene:
   
   start to read -> 1255:0 threadId:68
   after read,update read position -> 1255:1 threadId:68
   start to read -> 1255:1 threadId:68
   
   [pulsar-external-listener-39-1] WARN  
org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://my-property/my-ns/my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2]
 [d975f7e2-7b8f-467a-a56b-b3c1e96ec75f] Could not get connection while 
getLastMessageId -- Will try again in 190 ms
   
   [pulsar-external-listener-39-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://my-property/my-ns/my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2][d975f7e2-7b8f-467a-a56b-b3c1e96ec75f]
 Get topic last message Id
   
   [pulsar-client-io-38-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://my-property/my-ns/
   
my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2][d975f7e2-7b8f-467a-a56b-b3c1e96ec75f]
 Successfully getLastMessageId 1255:0
   
   [pulsar-client-io-38-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://my-property/my-ns/my-reader-topic-with-batching-inclusivebdcf4a3e-6c2e-4d36-b362-52c066d14da0-partition-2][d975f7e2-7b8f-467a-a56b-b3c1e96ec75f]
 **Seek subscription to the message 1255:0:2:4**
   
   ##### 2
   When I fixed bug 1, I found bug 2
   normal situation:
   1) ConsumerImpl.hasMessageAvailableAsync triggers seek, and then seek 
triggers reconnection
   2) Reconnection triggers connectionOpened, at this time startMessageId = 
clearReceiverQueue() = seekMessageId
   3) ConsumerImpl reconnects. BrokerlastDequeuedMessageId=earliest, 
startMessageId=seekMessageId
   4) hasMessageAvailableAsync return true
   5) The broker returns a message
   6) Consumer receive
   
   abnormal situation:
   1) ConsumerImpl.hasMessageAvailableAsync triggers seek, and then seek 
triggers reconnection
   2) Reconnection triggers connectionOpened, at this time startMessageId = 
clearReceiverQueue() = seekMessageId
   3) ConsumerImpl reconnects. BrokerlastDequeuedMessageId=earliest, 
startMessageId=seekMessageId
   4) hasMessageAvailableAsync return true
   5) The broker returns a message
   6) Due to other reasons, the connection is disconnected again, and step 2, 
3, and 4 are triggered again.
   When 2 is triggered, the message in incomeQueue is cleaned, and 
startMessageId is no longer equal to latest, so seek will not be triggered 
anymore, resulting in the cleaned message cannot be read
   
   The only time to trigger seek now is:
   lastDequeuedMessageId == MessageId.earliest
   and
   startMessageId.equals(MessageId.latest)
   and
   resetIncludeHead == true
   
   ### Modifications
   1)remove ·`assertTrue(set.remove(key))` in uniteTest, because message may be 
duplicated
   2)add seek in hasMessageAvailableAsync
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to