This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 98338e5 fix resend pendingItems race condition (#551)
98338e5 is described below
commit 98338e544d31570504cc06f4da63da47cbb8e628
Author: Rui Fu <[email protected]>
AuthorDate: Thu Jun 24 16:36:49 2021 +0800
fix resend pendingItems race condition (#551)
---
pulsar/producer_partition.go | 42 +++++++++++++++++++++++++++++++++---------
1 file changed, 33 insertions(+), 9 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b365f83..19aa06e 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -231,10 +231,29 @@ func (p *partitionProducer) grabCnx() error {
p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
pendingItems := p.pendingQueue.ReadableSlice()
- if len(pendingItems) > 0 {
- p.log.Infof("Resending %d pending batches", len(pendingItems))
- for _, pi := range pendingItems {
- p.cnx.WriteData(pi.(*pendingItem).batchData)
+ viewSize := len(pendingItems)
+ if viewSize > 0 {
+ p.log.Infof("Resending %d pending batches", viewSize)
+ lastViewItem := pendingItems[viewSize-1].(*pendingItem)
+
+ // iterate at most pending items
+ for i := 0; i < viewSize; i++ {
+ item := p.pendingQueue.Poll()
+ if item == nil {
+ continue
+ }
+ pi := item.(*pendingItem)
+ // when resending pending batches, we update the sendAt
timestamp and put to the back of queue
+ // to avoid pending item been removed by
failTimeoutMessages and cause race condition
+ pi.Lock()
+ pi.sentAt = time.Now()
+ pi.Unlock()
+ p.pendingQueue.Put(pi)
+ p.cnx.WriteData(pi.batchData)
+
+ if pi == lastViewItem {
+ break
+ }
}
}
return nil
@@ -523,8 +542,7 @@ func (p *partitionProducer) failTimeoutMessages() {
}
// flag the send has completed with error, flush make
no effect
- pi.completed = true
- buffersPool.Put(pi.batchData)
+ pi.Complete()
pi.Unlock()
// finally reached the last view item, current
iteration ends
@@ -706,9 +724,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
}
// Mark this pending item as done
- pi.completed = true
- // Return buffer to the pool since we're now done using it
- buffersPool.Put(pi.batchData)
+ pi.Complete()
}
func (p *partitionProducer) internalClose(req *closeProducer) {
@@ -800,3 +816,11 @@ type flushRequest struct {
waitGroup *sync.WaitGroup
err error
}
+
+func (i *pendingItem) Complete() {
+ if i.completed {
+ return
+ }
+ i.completed = true
+ buffersPool.Put(i.batchData)
+}