This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 179060f  [Issue 527] fix send goroutine blocked (#530)
179060f is described below

commit 179060fa145ec52f58e67202942b05b271727b65
Author: jiangbo9510 <[email protected]>
AuthorDate: Mon Jul 19 11:54:31 2021 +0800

    [Issue 527] fix send goroutine blocked (#530)
    
    * fix failTimeoutMessages() poll wrong item from block queue
    
    * add leave out lock
    
    Co-authored-by: boojiang <[email protected]>
    Co-authored-by: xiaolongran <[email protected]>
---
 pulsar/internal/blocking_queue.go | 17 +++++++++++++++++
 pulsar/producer_partition.go      | 30 +++++++++++++++++++++++-------
 2 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/pulsar/internal/blocking_queue.go 
b/pulsar/internal/blocking_queue.go
index 8162301..b44ec16 100644
--- a/pulsar/internal/blocking_queue.go
+++ b/pulsar/internal/blocking_queue.go
@@ -32,6 +32,9 @@ type BlockingQueue interface {
        // Poll dequeue one item, return nil if queue is empty
        Poll() interface{}
 
+       // CompareAndPoll compare the first item and poll it if meet the 
conditions
+       CompareAndPoll(compare func(item interface{}) bool) interface{}
+
        // Peek return the first item without dequeing, return nil if queue is 
empty
        Peek() interface{}
 
@@ -117,6 +120,20 @@ func (bq *blockingQueue) Poll() interface{} {
        return bq.dequeue()
 }
 
+func (bq *blockingQueue) CompareAndPoll(compare func(interface{}) bool) 
interface{} {
+       bq.mutex.Lock()
+       defer bq.mutex.Unlock()
+
+       if bq.size == 0 {
+               return nil
+       }
+
+       if compare(bq.items[bq.headIdx]) {
+               return bq.dequeue()
+       }
+       return nil
+}
+
 func (bq *blockingQueue) Peek() interface{} {
        bq.mutex.Lock()
        defer bq.mutex.Unlock()
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 002d261..8b3d33d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -515,21 +515,37 @@ func (p *partitionProducer) failTimeoutMessages() {
 
                // iterate at most viewSize items
                for i := 0; i < viewSize; i++ {
-                       item := p.pendingQueue.Poll()
+                       tickerNeedWaiting := time.Duration(0)
+                       item := p.pendingQueue.CompareAndPoll(
+                               func(m interface{}) bool {
+                                       if m == nil {
+                                               return false
+                                       }
+
+                                       pi := m.(*pendingItem)
+                                       pi.Lock()
+                                       defer pi.Unlock()
+                                       if nextWaiting := diff(pi.sentAt); 
nextWaiting > 0 {
+                                               // current and subsequent items 
not timeout yet, stop iterating
+                                               tickerNeedWaiting = nextWaiting
+                                               return false
+                                       }
+                                       return true
+                               })
+
                        if item == nil {
                                t.Reset(p.options.SendTimeout)
                                break
                        }
 
-                       pi := item.(*pendingItem)
-                       pi.Lock()
-                       if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
-                               // current and subsequent items not timeout 
yet, stop iterating
-                               t.Reset(nextWaiting)
-                               pi.Unlock()
+                       if tickerNeedWaiting > 0 {
+                               t.Reset(tickerNeedWaiting)
                                break
                        }
 
+                       pi := item.(*pendingItem)
+                       pi.Lock()
+
                        for _, i := range pi.sendRequests {
                                sr := i.(*sendRequest)
                                if sr.msg != nil {

Reply via email to