This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 861d7af  Avoid contention on producer mutex on critical path (#286)
861d7af is described below

commit 861d7af1fbc7d9b252fc430fccb9db7dec9e7924
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 18 14:57:45 2020 -0700

    Avoid contention on producer mutex on critical path (#286)
    
    * Avoid contention on producer mutex on critical path
    
    * Circumvent the race detector
    
    * Removed space
---
 pulsar/producer_impl.go | 47 +++++++++++++++++++++++++++--------------------
 1 file changed, 27 insertions(+), 20 deletions(-)

diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index fc9019a..35dae28 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -22,6 +22,7 @@ import (
        "sync"
        "sync/atomic"
        "time"
+       "unsafe"
 
        log "github.com/sirupsen/logrus"
 
@@ -29,11 +30,12 @@ import (
 )
 
 type producer struct {
-       sync.Mutex
+       sync.RWMutex
        client        *client
        options       *ProducerOptions
        topic         string
        producers     []Producer
+       producersPtr  unsafe.Pointer
        numPartitions uint32
        messageRouter func(*ProducerMessage, TopicMetadata) int
        ticker        *time.Ticker
@@ -115,6 +117,7 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
 
        p.Lock()
        defer p.Unlock()
+
        oldProducers := p.producers
 
        if oldProducers != nil {
@@ -179,6 +182,7 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
                return err
        }
 
+       atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
        atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
        return nil
 }
@@ -188,8 +192,8 @@ func (p *producer) Topic() string {
 }
 
 func (p *producer) Name() string {
-       p.Lock()
-       defer p.Unlock()
+       p.RLock()
+       defer p.RUnlock()
 
        return p.producers[0].Name()
 }
@@ -199,27 +203,30 @@ func (p *producer) NumPartitions() uint32 {
 }
 
 func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, 
error) {
-       p.Lock()
-       partition := p.messageRouter(msg, p)
-       pp := p.producers[partition]
-       p.Unlock()
-
-       return pp.Send(ctx, msg)
+       return p.getPartition(msg).Send(ctx, msg)
 }
 
 func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
        callback func(MessageID, *ProducerMessage, error)) {
-       p.Lock()
-       partition := p.messageRouter(msg, p)
-       pp := p.producers[partition]
-       p.Unlock()
+       p.getPartition(msg).SendAsync(ctx, msg, callback)
+}
 
-       pp.SendAsync(ctx, msg, callback)
+func (p *producer) getPartition(msg *ProducerMessage) Producer {
+       // Since partitions can only increase, it's ok if the producers list
+       // is updated in between. The numPartition is updated only after the 
list.
+       partition := p.messageRouter(msg, p)
+       producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
+       if partition >= len(producers) {
+               // We read the old producers list while the count was already
+               // updated
+               partition %= len(producers)
+       }
+       return producers[partition]
 }
 
 func (p *producer) LastSequenceID() int64 {
-       p.Lock()
-       defer p.Unlock()
+       p.RLock()
+       defer p.RUnlock()
 
        var maxSeq int64 = -1
        for _, pp := range p.producers {
@@ -232,8 +239,8 @@ func (p *producer) LastSequenceID() int64 {
 }
 
 func (p *producer) Flush() error {
-       p.Lock()
-       defer p.Unlock()
+       p.RLock()
+       defer p.RUnlock()
 
        for _, pp := range p.producers {
                if err := pp.Flush(); err != nil {
@@ -245,8 +252,8 @@ func (p *producer) Flush() error {
 }
 
 func (p *producer) Close() {
-       p.Lock()
-       defer p.Unlock()
+       p.RLock()
+       defer p.RUnlock()
        if p.ticker != nil {
                p.ticker.Stop()
        }

Reply via email to