This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 179060f [Issue 527] fix send goroutine blocked (#530)
179060f is described below
commit 179060fa145ec52f58e67202942b05b271727b65
Author: jiangbo9510 <[email protected]>
AuthorDate: Mon Jul 19 11:54:31 2021 +0800
[Issue 527] fix send goroutine blocked (#530)
* fix failTimeoutMessages() poll wrong item from block queue
* add leave out lock
Co-authored-by: boojiang <[email protected]>
Co-authored-by: xiaolongran <[email protected]>
---
pulsar/internal/blocking_queue.go | 17 +++++++++++++++++
pulsar/producer_partition.go | 30 +++++++++++++++++++++++-------
2 files changed, 40 insertions(+), 7 deletions(-)
diff --git a/pulsar/internal/blocking_queue.go
b/pulsar/internal/blocking_queue.go
index 8162301..b44ec16 100644
--- a/pulsar/internal/blocking_queue.go
+++ b/pulsar/internal/blocking_queue.go
@@ -32,6 +32,9 @@ type BlockingQueue interface {
// Poll dequeue one item, return nil if queue is empty
Poll() interface{}
+ // CompareAndPoll compare the first item and poll it if meet the
conditions
+ CompareAndPoll(compare func(item interface{}) bool) interface{}
+
// Peek return the first item without dequeing, return nil if queue is
empty
Peek() interface{}
@@ -117,6 +120,20 @@ func (bq *blockingQueue) Poll() interface{} {
return bq.dequeue()
}
+func (bq *blockingQueue) CompareAndPoll(compare func(interface{}) bool)
interface{} {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ if bq.size == 0 {
+ return nil
+ }
+
+ if compare(bq.items[bq.headIdx]) {
+ return bq.dequeue()
+ }
+ return nil
+}
+
func (bq *blockingQueue) Peek() interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 002d261..8b3d33d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -515,21 +515,37 @@ func (p *partitionProducer) failTimeoutMessages() {
// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
- item := p.pendingQueue.Poll()
+ tickerNeedWaiting := time.Duration(0)
+ item := p.pendingQueue.CompareAndPoll(
+ func(m interface{}) bool {
+ if m == nil {
+ return false
+ }
+
+ pi := m.(*pendingItem)
+ pi.Lock()
+ defer pi.Unlock()
+ if nextWaiting := diff(pi.sentAt);
nextWaiting > 0 {
+ // current and subsequent items
not timeout yet, stop iterating
+ tickerNeedWaiting = nextWaiting
+ return false
+ }
+ return true
+ })
+
if item == nil {
t.Reset(p.options.SendTimeout)
break
}
- pi := item.(*pendingItem)
- pi.Lock()
- if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
- // current and subsequent items not timeout
yet, stop iterating
- t.Reset(nextWaiting)
- pi.Unlock()
+ if tickerNeedWaiting > 0 {
+ t.Reset(tickerNeedWaiting)
break
}
+ pi := item.(*pendingItem)
+ pi.Lock()
+
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {