This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e48f3b  Fix discard unacked messages (#413)
0e48f3b is described below

commit 0e48f3b53ee21f0b2807662bb7ecdc955aa181ae
Author: hrsakai <[email protected]>
AuthorDate: Sat Dec 12 17:47:45 2020 +0900

    Fix discard unacked messages (#413)
    
    https://github.com/apache/pulsar-client-go/issues/389#issuecomment-741560624
    
    
    ### Motivation
    When a consumer whose receive queue is not empty reconnects to a broker, 
unacked messages don't be redelivered to a client application and these are 
removed from backlog.
    This is because StartMessageID is updated and the consumer implicitly acks 
redelivered unacked messages.
    
https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L972-L976
    
https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L507-L510
    
    
    
    ### Modifications
    * If startMessageID is undefined, it will not be updated in order to 
prevent  unacked messages from being discarded by following logic.
    
.https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L507-L510
---
 pulsar/consumer_partition.go | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index d34e0eb..285cf29 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -931,6 +931,10 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage() 
trackingMessageID {
 func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID {
        nextMessageInQueue := pc.clearQueueAndGetNextMessage()
 
+       if pc.startMessageID.Undefined() {
+               return pc.startMessageID
+       }
+
        if !nextMessageInQueue.Undefined() {
                return getPreviousMessage(nextMessageInQueue)
        } else if !pc.lastDequeuedMsg.Undefined() {

Reply via email to