freeznet opened a new issue #153: Memory leaks when producer/consumer close URL: https://github.com/apache/pulsar-client-go/issues/153 #### Expected behavior When `producer.Close()` or `consumer.Close()`, all related memory should be freed. #### Actual behavior When we rapidly start and close multiple producers (or consumers), memory cost of the process are becomes very large. `pprof` results as follows: ``` $ go tool pprof -base ./mem10.out mem99.out Type: inuse_space Time: Dec 30, 2019 at 5:32pm (CST) Entering interactive mode (type "help" for commands, "o" for options) (pprof) top Showing nodes accounting for 4017.42kB, 100% of 4017.42kB total Showing top 10 nodes out of 13 flat flat% sum% cum cum% 2600.21kB 64.72% 64.72% 2600.21kB 64.72% github.com/apache/pulsar-client-go/pulsar/internal.NewBlockingQueue 902.59kB 22.47% 87.19% 902.59kB 22.47% compress/flate.NewWriter 514.63kB 12.81% 100% 1417.21kB 35.28% runtime/pprof.(*profileBuilder).locForPC 0 0% 100% 902.59kB 22.47% compress/gzip.(*Writer).Write 0 0% 100% 2600.21kB 64.72% github.com/apache/pulsar-client-go/pulsar.newPartitionProducer 0 0% 100% 2600.21kB 64.72% github.com/apache/pulsar-client-go/pulsar.newProducer.func2 0 0% 100% 1417.21kB 35.28% main.main 0 0% 100% 1417.21kB 35.28% runtime.main 0 0% 100% 902.59kB 22.47% runtime/pprof.(*profileBuilder).flush 0 0% 100% 1417.21kB 35.28% runtime/pprof.WriteHeapProfile ``` after 100 iterations of create and close producer with different topic, a possible memory leak was found (`github.com/apache/pulsar-client-go/pulsar/internal.NewBlockingQueue`). Which means `partitionProducer`s are not been garbage collected. After code reviews, we found that `ClientHandlers` only cleanup the cached `producer` and `consumer` after `client.Close()` been called. Which caused the closed `producer` still been referenced by `ClientHandlers` and not been garbage collected. #### Steps to reproduce ``` initLogger(true) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://127.0.0.1:16650", }) defer client.Close() if err != nil { log.Fatal(err) return } defer client.Close() for jj :=0; jj<500; jj++{ producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: fmt.Sprintf("topic-%d", jj), }) if err != nil { log.Fatal(err) return } ctx := context.Background() for i := 0; i < 10; i++ { if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }); err != nil { log.Fatal(err) } else { log.Println("Published message: ", msgId) } } producer.Close() fm, err := os.OpenFile(fmt.Sprintf("./mem%d.out", jj), os.O_RDWR|os.O_CREATE, 0644) if err != nil { log.Fatal(err) } pprof.WriteHeapProfile(fm) fm.Close() } ``` #### System configuration **Pulsar version**: 2.4.2
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
