This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 65cb19a  Fix producer goroutine leak (#331)
65cb19a is described below

commit 65cb19a70c2c3d49e03d7fd04a36f5e8bd95b256
Author: Christian Simon <[email protected]>
AuthorDate: Fri Jul 24 01:23:15 2020 +0100

    Fix producer goroutine leak (#331)
    
    Per Producer there is a goroutine leaked. The goroutine is used for the
    partition auto-discovery and will never exit.
    
    This changes the goroutine to be properly exiting after a Close() on the
    producer.
---
 pulsar/producer_impl.go | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 4ee0d8d..2a2b63e 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -59,6 +59,7 @@ type producer struct {
        numPartitions uint32
        messageRouter func(*ProducerMessage, TopicMetadata) int
        ticker        *time.Ticker
+       tickerStop    chan struct{}
 
        log *log.Entry
 }
@@ -118,12 +119,19 @@ func newProducer(client *client, options 
*ProducerOptions) (*producer, error) {
                return nil, err
        }
 
-       p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
+       ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
+       p.ticker = ticker
+       p.tickerStop = make(chan struct{})
 
        go func() {
-               for range p.ticker.C {
-                       p.log.Debug("Auto discovering new partitions")
-                       p.internalCreatePartitionsProducers()
+               for {
+                       select {
+                       case <-ticker.C:
+                               p.log.Debug("Auto discovering new partitions")
+                               p.internalCreatePartitionsProducers()
+                       case <-p.tickerStop:
+                               return
+                       }
                }
        }()
 
@@ -282,6 +290,8 @@ func (p *producer) Close() {
        defer p.RUnlock()
        if p.ticker != nil {
                p.ticker.Stop()
+               close(p.tickerStop)
+               p.ticker = nil
        }
 
        for _, pp := range p.producers {

Reply via email to