This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d336ff7 Remove mutitopic- and regexp consumer along with reader from
client' handlers map when Close called. (#620)
d336ff7 is described below
commit d336ff717c98b30d976dd1ebf9b1acf76e05c695
Author: PowerStateFailure <[email protected]>
AuthorDate: Sat Oct 9 12:30:57 2021 +0500
Remove mutitopic- and regexp consumer along with reader from client'
handlers map when Close called. (#620)
This change fixes memory leak when frequently using short-living regexp- or
multitopic consumers because there were not removed from client handler on Close
Co-authored-by: xiaolongran <[email protected]>
---
pulsar/consumer_multitopic.go | 4 ++++
pulsar/consumer_regex.go | 1 +
pulsar/reader_impl.go | 3 +++
3 files changed, 8 insertions(+)
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index dc4ad7b..faf8917 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -30,6 +30,8 @@ import (
)
type multiTopicConsumer struct {
+ client *client
+
options ConsumerOptions
consumerName string
@@ -48,6 +50,7 @@ type multiTopicConsumer struct {
func newMultiTopicConsumer(client *client, options ConsumerOptions, topics
[]string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter)
(Consumer, error) {
mtc := &multiTopicConsumer{
+ client: client,
options: options,
messageCh: messageCh,
consumers: make(map[string]Consumer, len(topics)),
@@ -186,6 +189,7 @@ func (c *multiTopicConsumer) Close() {
}
wg.Wait()
close(c.closeCh)
+ c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
})
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 9e0c125..2f46c48 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -217,6 +217,7 @@ func (c *regexConsumer) Close() {
}(con)
}
wg.Wait()
+ c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
})
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index a019d9c..9983286 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -33,6 +33,7 @@ const (
type reader struct {
sync.Mutex
+ client *client
pc *partitionConsumer
messageCh chan ConsumerMessage
lastMessageInBroker trackingMessageID
@@ -91,6 +92,7 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
}
reader := &reader{
+ client: client,
messageCh: make(chan ConsumerMessage),
log: client.log.SubLogger(log.Fields{"topic":
options.Topic}),
metrics: client.metrics.GetTopicMetrics(options.Topic),
@@ -174,6 +176,7 @@ func (r *reader) hasMoreMessages() bool {
func (r *reader) Close() {
r.pc.Close()
+ r.client.handlers.Del(r)
r.metrics.ReadersClosed.Inc()
}