This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0c7fd4a Fix producer state by reconnecting when receiving unexpected
receipts (#335) (#336)
0c7fd4a is described below
commit 0c7fd4a417edc45c40db59a4ff2d0371cacf6c8b
Author: Denis Vergnes <[email protected]>
AuthorDate: Mon Jul 27 16:22:18 2020 -0700
Fix producer state by reconnecting when receiving unexpected receipts
(#335) (#336)
---
pulsar/producer_partition.go | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 18f22f6..a16193f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -458,13 +458,21 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
pi, ok := p.pendingQueue.Peek().(*pendingItem)
if !ok {
- p.log.Warnf("Received ack for %v although the pending queue is
empty", response.GetMessageId())
+ // if we receive a receipt although the pending queue is empty,
the state of the broker and the producer differs.
+ // At that point, it is better to close the connection to the
broker to reconnect to a broker hopping it solves
+ // the state discrepancy.
+ p.log.Warnf("Received ack for %v although the pending queue is
empty, closing connection", response.GetMessageId())
+ p.cnx.Close()
return
}
if pi.sequenceID != response.GetSequenceId() {
- p.log.Warnf("Received ack for %v on sequenceId %v - expected:
%v", response.GetMessageId(),
+ // if we receive a receipt that is not the one expected, the
state of the broker and the producer differs.
+ // At that point, it is better to close the connection to the
broker to reconnect to a broker hopping it solves
+ // the state discrepancy.
+ p.log.Warnf("Received ack for %v on sequenceId %v - expected:
%v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
+ p.cnx.Close()
return
}