This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a3fcc9a  [fix] Close consumer resources if creation fails (#1070)
a3fcc9a is described below

commit a3fcc9a5422c415e0124c3efbf12ecb6596c12ef
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Jul 26 06:09:21 2023 -0500

    [fix] Close consumer resources if creation fails (#1070)
    
    ### Motivation
    
    When a consumer fails to get created, we should close any resources that it 
created to prevent leaks of internal resources and leaks of the consumer on the 
broker side. The broker leak could happen if the connection was left open. 
These fixes are similar to #1061.
    
    ### Modifications
    
    * Close `ackGroupingTracker` and `chunkedMsgCtxMap` if `grabConn` fails. We 
cannot call `Close` on the consumer because the state is not `Ready`. If we 
re-design the consumer, it could be nice to be able to call `Close` in this 
scenario.
    * Call `Close` on the consumer in cases where we move it to `Ready` but 
determine it is not able to be created.
    * Fix typo in comment
---
 pulsar/consumer_partition.go | 6 ++++--
 pulsar/producer_partition.go | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b50c0a0..364dae7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -372,6 +372,8 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
                pc.nackTracker.Close()
+               pc.ackGroupingTracker.close()
+               pc.chunkedMsgCtxMap.Close()
                return nil, err
        }
        pc.log.Info("Created consumer")
@@ -381,7 +383,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        if pc.options.startMessageIDInclusive && startingMessageID != nil && 
startingMessageID.equal(latestMessageID) {
                msgID, err := pc.requestGetLastMessageID()
                if err != nil {
-                       pc.nackTracker.Close()
+                       pc.Close()
                        return nil, err
                }
                if msgID.entryID != noMessageEntry {
@@ -390,7 +392,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                        // use the WithoutClear version because the dispatcher 
is not started yet
                        err = pc.requestSeekWithoutClear(msgID.messageID)
                        if err != nil {
-                               pc.nackTracker.Close()
+                               pc.Close()
                                return nil, err
                        }
                }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 6bdb090..d3f61ef 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1340,7 +1340,7 @@ func (p *partitionProducer) setProducerState(state 
producerState) {
        p.state.Swap(int32(state))
 }
 
-// set a new consumerState and return the last state
+// set a new producerState and return the last state
 // returns bool if the new state has been set or not
 func (p *partitionProducer) casProducerState(oldState, newState producerState) 
bool {
        return p.state.CAS(int32(oldState), int32(newState))

Reply via email to