This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d80a722 fix issue 650,different handle sequence value (#651)
d80a722 is described below
commit d80a722ac1ab197c7e8649efdeb1e16356cbb3bb
Author: baomingyu <[email protected]>
AuthorDate: Thu Nov 4 10:57:21 2021 +0800
fix issue 650,different handle sequence value (#651)
* fix issue 650,different handle sequence value
* add sequenceId equal check
---
pulsar/producer_partition.go | 76 ++++++++++++++++++++++----------------------
1 file changed, 38 insertions(+), 38 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b2b9273..d67c0c0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -784,7 +784,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
return
}
- if pi.sequenceID != response.GetSequenceId() {
+ if pi.sequenceID < response.GetSequenceId() {
// if we receive a receipt that is not the one expected, the
state of the broker and the producer differs.
// At that point, it is better to close the connection to the
broker to reconnect to a broker hopping it solves
// the state discrepancy.
@@ -792,49 +792,49 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
response.GetSequenceId(), pi.sequenceID)
p.cnx.Close()
return
- }
-
- // The ack was indeed for the expected item in the queue, we can remove
it and trigger the callback
- p.pendingQueue.Poll()
-
- now := time.Now().UnixNano()
-
- // lock the pending item while sending the requests
- pi.Lock()
- defer pi.Unlock()
- p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) /
1.0e9)
- for idx, i := range pi.sendRequests {
- sr := i.(*sendRequest)
- if sr.msg != nil {
- atomic.StoreInt64(&p.lastSequenceID,
int64(pi.sequenceID))
- p.publishSemaphore.Release()
+ } else if pi.sequenceID == response.GetSequenceId() {
+ // The ack was indeed for the expected item in the queue, we
can remove it and trigger the callback
+ p.pendingQueue.Poll()
+
+ now := time.Now().UnixNano()
+
+ // lock the pending item while sending the requests
+ pi.Lock()
+ defer pi.Unlock()
+
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
+ for idx, i := range pi.sendRequests {
+ sr := i.(*sendRequest)
+ if sr.msg != nil {
+ atomic.StoreInt64(&p.lastSequenceID,
int64(pi.sequenceID))
+ p.publishSemaphore.Release()
+
+
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
+ p.metrics.MessagesPublished.Inc()
+ p.metrics.MessagesPending.Dec()
+ payloadSize := float64(len(sr.msg.Payload))
+ p.metrics.BytesPublished.Add(payloadSize)
+ p.metrics.BytesPending.Sub(payloadSize)
+ }
-
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
- p.metrics.MessagesPublished.Inc()
- p.metrics.MessagesPending.Dec()
- payloadSize := float64(len(sr.msg.Payload))
- p.metrics.BytesPublished.Add(payloadSize)
- p.metrics.BytesPending.Sub(payloadSize)
- }
+ if sr.callback != nil || len(p.options.Interceptors) >
0 {
+ msgID := newMessageID(
+ int64(response.MessageId.GetLedgerId()),
+ int64(response.MessageId.GetEntryId()),
+ int32(idx),
+ p.partitionIdx,
+ )
- if sr.callback != nil || len(p.options.Interceptors) > 0 {
- msgID := newMessageID(
- int64(response.MessageId.GetLedgerId()),
- int64(response.MessageId.GetEntryId()),
- int32(idx),
- p.partitionIdx,
- )
+ if sr.callback != nil {
+ sr.callback(msgID, sr.msg, nil)
+ }
- if sr.callback != nil {
- sr.callback(msgID, sr.msg, nil)
+ p.options.Interceptors.OnSendAcknowledgement(p,
sr.msg, msgID)
}
-
- p.options.Interceptors.OnSendAcknowledgement(p, sr.msg,
msgID)
}
- }
- // Mark this pending item as done
- pi.Complete()
+ // Mark this pending item as done
+ pi.Complete()
+ }
}
func (p *partitionProducer) internalClose(req *closeProducer) {