This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.11.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 191685f80488826ba192d2be4aee7f48cb08cee1
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Jul 26 06:09:21 2023 -0500

    [fix] Close consumer resources if creation fails (#1070)
    
    ### Motivation
    
    When a consumer fails to get created, we should close any resources that it 
created to prevent leaks of internal resources and leaks of the consumer on the 
broker side. The broker leak could happen if the connection was left open. 
These fixes are similar to #1061.
    
    ### Modifications
    
    * Close `ackGroupingTracker` and `chunkedMsgCtxMap` if `grabConn` fails. We 
cannot call `Close` on the consumer because the state is not `Ready`. If we 
re-design the consumer, it could be nice to be able to call `Close` in this 
scenario.
    * Call `Close` on the consumer in cases where we move it to `Ready` but 
determine it is not able to be created.
    * Fix typo in comment
    
    (cherry picked from commit a3fcc9a5422c415e0124c3efbf12ecb6596c12ef)
---
 pulsar/consumer_partition.go | 6 ++++--
 pulsar/producer_partition.go | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b50c0a00..364dae75 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -372,6 +372,8 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
                pc.nackTracker.Close()
+               pc.ackGroupingTracker.close()
+               pc.chunkedMsgCtxMap.Close()
                return nil, err
        }
        pc.log.Info("Created consumer")
@@ -381,7 +383,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        if pc.options.startMessageIDInclusive && startingMessageID != nil && 
startingMessageID.equal(latestMessageID) {
                msgID, err := pc.requestGetLastMessageID()
                if err != nil {
-                       pc.nackTracker.Close()
+                       pc.Close()
                        return nil, err
                }
                if msgID.entryID != noMessageEntry {
@@ -390,7 +392,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                        // use the WithoutClear version because the dispatcher 
is not started yet
                        err = pc.requestSeekWithoutClear(msgID.messageID)
                        if err != nil {
-                               pc.nackTracker.Close()
+                               pc.Close()
                                return nil, err
                        }
                }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 9f89d7b7..1a40a191 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1405,7 +1405,7 @@ func (p *partitionProducer) setProducerState(state 
producerState) {
        p.state.Swap(int32(state))
 }
 
-// set a new consumerState and return the last state
+// set a new producerState and return the last state
 // returns bool if the new state has been set or not
 func (p *partitionProducer) casProducerState(oldState, newState producerState) 
bool {
        return p.state.CAS(int32(oldState), int32(newState))

Reply via email to