This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 65cb19a Fix producer goroutine leak (#331)
65cb19a is described below
commit 65cb19a70c2c3d49e03d7fd04a36f5e8bd95b256
Author: Christian Simon <[email protected]>
AuthorDate: Fri Jul 24 01:23:15 2020 +0100
Fix producer goroutine leak (#331)
Per Producer there is a goroutine leaked. The goroutine is used for the
partition auto-discovery and will never exit.
This changes the goroutine to be properly exiting after a Close() on the
producer.
---
pulsar/producer_impl.go | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 4ee0d8d..2a2b63e 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -59,6 +59,7 @@ type producer struct {
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
+ tickerStop chan struct{}
log *log.Entry
}
@@ -118,12 +119,19 @@ func newProducer(client *client, options
*ProducerOptions) (*producer, error) {
return nil, err
}
- p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
+ ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
+ p.ticker = ticker
+ p.tickerStop = make(chan struct{})
go func() {
- for range p.ticker.C {
- p.log.Debug("Auto discovering new partitions")
- p.internalCreatePartitionsProducers()
+ for {
+ select {
+ case <-ticker.C:
+ p.log.Debug("Auto discovering new partitions")
+ p.internalCreatePartitionsProducers()
+ case <-p.tickerStop:
+ return
+ }
}
}()
@@ -282,6 +290,8 @@ func (p *producer) Close() {
defer p.RUnlock()
if p.ticker != nil {
p.ticker.Stop()
+ close(p.tickerStop)
+ p.ticker = nil
}
for _, pp := range p.producers {