This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c7fd4a  Fix producer state by reconnecting when receiving unexpected 
receipts (#335) (#336)
0c7fd4a is described below

commit 0c7fd4a417edc45c40db59a4ff2d0371cacf6c8b
Author: Denis Vergnes <[email protected]>
AuthorDate: Mon Jul 27 16:22:18 2020 -0700

    Fix producer state by reconnecting when receiving unexpected receipts 
(#335) (#336)
---
 pulsar/producer_partition.go | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 18f22f6..a16193f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -458,13 +458,21 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
        pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
        if !ok {
-               p.log.Warnf("Received ack for %v although the pending queue is 
empty", response.GetMessageId())
+               // if we receive a receipt although the pending queue is empty, 
the state of the broker and the producer differs.
+               // At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
+               // the state discrepancy.
+               p.log.Warnf("Received ack for %v although the pending queue is 
empty, closing connection", response.GetMessageId())
+               p.cnx.Close()
                return
        }
 
        if pi.sequenceID != response.GetSequenceId() {
-               p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v", response.GetMessageId(),
+               // if we receive a receipt that is not the one expected, the 
state of the broker and the producer differs.
+               // At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
+               // the state discrepancy.
+               p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, closing connection", response.GetMessageId(),
                        response.GetSequenceId(), pi.sequenceID)
+               p.cnx.Close()
                return
        }
 

Reply via email to