This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0e76dc2 asynchronized send timeout checking (#460)
0e76dc2 is described below
commit 0e76dc2f6e8f4857d377ac9422e2df5f715b8ae2
Author: wuYin <[email protected]>
AuthorDate: Tue Feb 9 10:41:58 2021 +0800
asynchronized send timeout checking (#460)
Fixes #458
### Motivation
For current producer, send timeout checking triggered by interval batch
flush
If connection closed, the producer eventloop will blocked to reconnect to
broker, lead to batch flush and send timeout checking take no effective,
java-client timer did effective in this situation
### Modifications
Asynchronized send timeout by running in independent goroutine until
producer closed, and without a pending queue lock
### Verifying this change
- [x] Make sure that the change passes the CI checks.
### Others
Without pending queue lock, the send timeout checking gets more
complicated, I don't know if it's worth it for performance.
---
pulsar/producer_partition.go | 118 +++++++++++++++++++++++++++++--------------
1 file changed, 81 insertions(+), 37 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2536909..41a1b9b 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,6 +140,9 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.setProducerState(producerReady)
+ if p.options.SendTimeout > 0 {
+ go p.failTimeoutMessages()
+ }
go p.runEventsLoop()
return p, nil
@@ -427,10 +430,6 @@ type pendingItem struct {
}
func (p *partitionProducer) internalFlushCurrentBatch() {
- if p.options.SendTimeout > 0 {
- p.failTimeoutMessages()
- }
-
batchData, sequenceID, callbacks := p.batchBuilder.Flush()
if batchData == nil {
return
@@ -446,46 +445,91 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
}
func (p *partitionProducer) failTimeoutMessages() {
- // since Closing/Closed connection couldn't be reopen, load and compare
is safe
- state := p.getProducerState()
- if state == producerClosing || state == producerClosed {
- return
+ diff := func(sentAt time.Time) time.Duration {
+ return p.options.SendTimeout - time.Since(sentAt)
}
- item := p.pendingQueue.Peek()
- if item == nil {
- // pending queue is empty
- return
- }
+ t := time.NewTimer(p.options.SendTimeout)
+ defer t.Stop()
- pi := item.(*pendingItem)
- if time.Since(pi.sentAt) < p.options.SendTimeout {
- // pending messages not timeout yet
- return
- }
+ for range t.C {
+ state := p.getProducerState()
+ if state == producerClosing || state == producerClosed {
+ return
+ }
+
+ item := p.pendingQueue.Peek()
+ if item == nil {
+ // pending queue is empty
+ t.Reset(p.options.SendTimeout)
+ continue
+ }
+ oldestItem := item.(*pendingItem)
+ if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
+ // none of these pending messages have timed out, wait
and retry
+ t.Reset(nextWaiting)
+ continue
+ }
- p.log.Infof("Failing %d messages", p.pendingQueue.Size())
- for p.pendingQueue.Size() > 0 {
- pi = p.pendingQueue.Poll().(*pendingItem)
- pi.Lock()
- for _, i := range pi.sendRequests {
- sr := i.(*sendRequest)
- if sr.msg != nil {
- size := len(sr.msg.Payload)
- p.publishSemaphore.Release()
- p.metrics.MessagesPending.Dec()
- p.metrics.BytesPending.Sub(float64(size))
- p.metrics.PublishErrorsTimeout.Inc()
- p.log.WithError(errSendTimeout).
- WithField("size", size).
- WithField("properties",
sr.msg.Properties)
+ // since pending queue is not thread safe because of there is
no global iteration lock
+ // to control poll from pending queue, current goroutine and
connection receipt handler
+ // iterate pending queue at the same time, this maybe a
performance trade-off
+ // see https://github.com/apache/pulsar-client-go/pull/301
+ curViewItems := p.pendingQueue.ReadableSlice()
+ viewSize := len(curViewItems)
+ if viewSize <= 0 {
+ // double check
+ t.Reset(p.options.SendTimeout)
+ continue
+ }
+ p.log.Infof("Failing %d messages", viewSize)
+ lastViewItem := curViewItems[viewSize-1].(*pendingItem)
+
+ // iterate at most viewSize items
+ for i := 0; i < viewSize; i++ {
+ item := p.pendingQueue.Poll()
+ if item == nil {
+ t.Reset(p.options.SendTimeout)
+ break
}
- if sr.callback != nil {
- sr.callback(nil, sr.msg, errSendTimeout)
+
+ pi := item.(*pendingItem)
+ pi.Lock()
+ if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
+ // current and subsequent items not timeout
yet, stop iterating
+ t.Reset(nextWaiting)
+ pi.Unlock()
+ break
+ }
+
+ for _, i := range pi.sendRequests {
+ sr := i.(*sendRequest)
+ if sr.msg != nil {
+ size := len(sr.msg.Payload)
+ p.publishSemaphore.Release()
+ p.metrics.MessagesPending.Dec()
+
p.metrics.BytesPending.Sub(float64(size))
+ p.metrics.PublishErrorsTimeout.Inc()
+ p.log.WithError(errSendTimeout).
+ WithField("size", size).
+ WithField("properties",
sr.msg.Properties)
+ }
+ if sr.callback != nil {
+ sr.callback(nil, sr.msg, errSendTimeout)
+ }
+ }
+
+ // flag the send has completed with error, flush make
no effect
+ pi.completed = true
+ buffersPool.Put(pi.batchData)
+ pi.Unlock()
+
+ // finally reached the last view item, current
iteration ends
+ if pi == lastViewItem {
+ t.Reset(p.options.SendTimeout)
+ break
}
}
- buffersPool.Put(pi.batchData)
- pi.Unlock()
}
}