This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 85d7661 Fix ack timeout cause reconnect (#756)
85d7661 is described below
commit 85d76615dea35e24e4699d6dc54be216aeb89499
Author: xiaolong ran <[email protected]>
AuthorDate: Wed Apr 6 11:36:02 2022 +0800
Fix ack timeout cause reconnect (#756)
* Fix ack timeout cause reconnect
Signed-off-by: xiaolongran <[email protected]>
* fix some logic
Signed-off-by: xiaolongran <[email protected]>
---
pulsar/producer_partition.go | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d031e9a..525b89c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -780,22 +780,22 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
if !ok {
// if we receive a receipt although the pending queue is empty,
the state of the broker and the producer differs.
- // At that point, it is better to close the connection to the
broker to reconnect to a broker hopping it solves
- // the state discrepancy.
- p.log.Warnf("Received ack for %v although the pending queue is
empty, closing connection", response.GetMessageId())
- p._getConn().Close()
+ p.log.Warnf("Got ack %v for timed out msg",
response.GetMessageId())
return
}
if pi.sequenceID < response.GetSequenceId() {
- // if we receive a receipt that is not the one expected, the
state of the broker and the producer differs.
- // At that point, it is better to close the connection to the
broker to reconnect to a broker hopping it solves
- // the state discrepancy.
+ // Ignoring the ack since it's referring to a message that has
already timed out.
+ p.log.Warnf("Received ack for %v on sequenceId %v - expected:
%v, closing connection", response.GetMessageId(),
+ response.GetSequenceId(), pi.sequenceID)
+ return
+ } else if pi.sequenceID > response.GetSequenceId() {
+ // Force connection closing so that messages can be
re-transmitted in a new connection
p.log.Warnf("Received ack for %v on sequenceId %v - expected:
%v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
p._getConn().Close()
return
- } else if pi.sequenceID == response.GetSequenceId() {
+ } else {
// The ack was indeed for the expected item in the queue, we
can remove it and trigger the callback
p.pendingQueue.Poll()