This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e76dc2  asynchronized send timeout checking (#460)
0e76dc2 is described below

commit 0e76dc2f6e8f4857d377ac9422e2df5f715b8ae2
Author: wuYin <[email protected]>
AuthorDate: Tue Feb 9 10:41:58 2021 +0800

    asynchronized send timeout checking (#460)
    
    Fixes #458
    
    ### Motivation
    
    For current producer, send timeout checking triggered by interval batch 
flush
    If connection closed, the producer eventloop will blocked to reconnect to 
broker, lead to batch flush and send timeout checking take no effective, 
java-client timer did effective in this situation
    
    ### Modifications
    
    Asynchronized send timeout by running in independent goroutine until 
producer closed, and without a pending queue lock
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    ### Others
    
    Without pending queue lock, the send timeout checking  gets more 
complicated, I don't know if it's worth it for performance.
---
 pulsar/producer_partition.go | 118 +++++++++++++++++++++++++++++--------------
 1 file changed, 81 insertions(+), 37 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2536909..41a1b9b 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,6 +140,9 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
        p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
        p.setProducerState(producerReady)
 
+       if p.options.SendTimeout > 0 {
+               go p.failTimeoutMessages()
+       }
        go p.runEventsLoop()
 
        return p, nil
@@ -427,10 +430,6 @@ type pendingItem struct {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
-       if p.options.SendTimeout > 0 {
-               p.failTimeoutMessages()
-       }
-
        batchData, sequenceID, callbacks := p.batchBuilder.Flush()
        if batchData == nil {
                return
@@ -446,46 +445,91 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 }
 
 func (p *partitionProducer) failTimeoutMessages() {
-       // since Closing/Closed connection couldn't be reopen, load and compare 
is safe
-       state := p.getProducerState()
-       if state == producerClosing || state == producerClosed {
-               return
+       diff := func(sentAt time.Time) time.Duration {
+               return p.options.SendTimeout - time.Since(sentAt)
        }
 
-       item := p.pendingQueue.Peek()
-       if item == nil {
-               // pending queue is empty
-               return
-       }
+       t := time.NewTimer(p.options.SendTimeout)
+       defer t.Stop()
 
-       pi := item.(*pendingItem)
-       if time.Since(pi.sentAt) < p.options.SendTimeout {
-               // pending messages not timeout yet
-               return
-       }
+       for range t.C {
+               state := p.getProducerState()
+               if state == producerClosing || state == producerClosed {
+                       return
+               }
+
+               item := p.pendingQueue.Peek()
+               if item == nil {
+                       // pending queue is empty
+                       t.Reset(p.options.SendTimeout)
+                       continue
+               }
+               oldestItem := item.(*pendingItem)
+               if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
+                       // none of these pending messages have timed out, wait 
and retry
+                       t.Reset(nextWaiting)
+                       continue
+               }
 
-       p.log.Infof("Failing %d messages", p.pendingQueue.Size())
-       for p.pendingQueue.Size() > 0 {
-               pi = p.pendingQueue.Poll().(*pendingItem)
-               pi.Lock()
-               for _, i := range pi.sendRequests {
-                       sr := i.(*sendRequest)
-                       if sr.msg != nil {
-                               size := len(sr.msg.Payload)
-                               p.publishSemaphore.Release()
-                               p.metrics.MessagesPending.Dec()
-                               p.metrics.BytesPending.Sub(float64(size))
-                               p.metrics.PublishErrorsTimeout.Inc()
-                               p.log.WithError(errSendTimeout).
-                                       WithField("size", size).
-                                       WithField("properties", 
sr.msg.Properties)
+               // since pending queue is not thread safe because of there is 
no global iteration lock
+               // to control poll from pending queue, current goroutine and 
connection receipt handler
+               // iterate pending queue at the same time, this maybe a 
performance trade-off
+               // see https://github.com/apache/pulsar-client-go/pull/301
+               curViewItems := p.pendingQueue.ReadableSlice()
+               viewSize := len(curViewItems)
+               if viewSize <= 0 {
+                       // double check
+                       t.Reset(p.options.SendTimeout)
+                       continue
+               }
+               p.log.Infof("Failing %d messages", viewSize)
+               lastViewItem := curViewItems[viewSize-1].(*pendingItem)
+
+               // iterate at most viewSize items
+               for i := 0; i < viewSize; i++ {
+                       item := p.pendingQueue.Poll()
+                       if item == nil {
+                               t.Reset(p.options.SendTimeout)
+                               break
                        }
-                       if sr.callback != nil {
-                               sr.callback(nil, sr.msg, errSendTimeout)
+
+                       pi := item.(*pendingItem)
+                       pi.Lock()
+                       if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
+                               // current and subsequent items not timeout 
yet, stop iterating
+                               t.Reset(nextWaiting)
+                               pi.Unlock()
+                               break
+                       }
+
+                       for _, i := range pi.sendRequests {
+                               sr := i.(*sendRequest)
+                               if sr.msg != nil {
+                                       size := len(sr.msg.Payload)
+                                       p.publishSemaphore.Release()
+                                       p.metrics.MessagesPending.Dec()
+                                       
p.metrics.BytesPending.Sub(float64(size))
+                                       p.metrics.PublishErrorsTimeout.Inc()
+                                       p.log.WithError(errSendTimeout).
+                                               WithField("size", size).
+                                               WithField("properties", 
sr.msg.Properties)
+                               }
+                               if sr.callback != nil {
+                                       sr.callback(nil, sr.msg, errSendTimeout)
+                               }
+                       }
+
+                       // flag the send has completed with error, flush make 
no effect
+                       pi.completed = true
+                       buffersPool.Put(pi.batchData)
+                       pi.Unlock()
+
+                       // finally reached the last view item, current 
iteration ends
+                       if pi == lastViewItem {
+                               t.Reset(p.options.SendTimeout)
+                               break
                        }
                }
-               buffersPool.Put(pi.batchData)
-               pi.Unlock()
        }
 }
 

Reply via email to