This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0e48f3b Fix discard unacked messages (#413)
0e48f3b is described below
commit 0e48f3b53ee21f0b2807662bb7ecdc955aa181ae
Author: hrsakai <[email protected]>
AuthorDate: Sat Dec 12 17:47:45 2020 +0900
Fix discard unacked messages (#413)
https://github.com/apache/pulsar-client-go/issues/389#issuecomment-741560624
### Motivation
When a consumer whose receive queue is not empty reconnects to a broker,
unacked messages don't be redelivered to a client application and these are
removed from backlog.
This is because StartMessageID is updated and the consumer implicitly acks
redelivered unacked messages.
https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L972-L976
https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L507-L510
### Modifications
* If startMessageID is undefined, it will not be updated in order to
prevent unacked messages from being discarded by following logic.
.https://github.com/apache/pulsar-client-go/blob/v0.3.0/pulsar/consumer_partition.go#L507-L510
---
pulsar/consumer_partition.go | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index d34e0eb..285cf29 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -931,6 +931,10 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage()
trackingMessageID {
func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID {
nextMessageInQueue := pc.clearQueueAndGetNextMessage()
+ if pc.startMessageID.Undefined() {
+ return pc.startMessageID
+ }
+
if !nextMessageInQueue.Undefined() {
return getPreviousMessage(nextMessageInQueue)
} else if !pc.lastDequeuedMsg.Undefined() {