This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 191685f80488826ba192d2be4aee7f48cb08cee1 Author: Michael Marshall <[email protected]> AuthorDate: Wed Jul 26 06:09:21 2023 -0500 [fix] Close consumer resources if creation fails (#1070) ### Motivation When a consumer fails to get created, we should close any resources that it created to prevent leaks of internal resources and leaks of the consumer on the broker side. The broker leak could happen if the connection was left open. These fixes are similar to #1061. ### Modifications * Close `ackGroupingTracker` and `chunkedMsgCtxMap` if `grabConn` fails. We cannot call `Close` on the consumer because the state is not `Ready`. If we re-design the consumer, it could be nice to be able to call `Close` in this scenario. * Call `Close` on the consumer in cases where we move it to `Ready` but determine it is not able to be created. * Fix typo in comment (cherry picked from commit a3fcc9a5422c415e0124c3efbf12ecb6596c12ef) --- pulsar/consumer_partition.go | 6 ++++-- pulsar/producer_partition.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b50c0a00..364dae75 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -372,6 +372,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if err != nil { pc.log.WithError(err).Error("Failed to create consumer") pc.nackTracker.Close() + pc.ackGroupingTracker.close() + pc.chunkedMsgCtxMap.Close() return nil, err } pc.log.Info("Created consumer") @@ -381,7 +383,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) { msgID, err := pc.requestGetLastMessageID() if err != nil { - pc.nackTracker.Close() + pc.Close() return nil, err } if msgID.entryID != noMessageEntry { @@ -390,7 +392,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon // use the WithoutClear version because the dispatcher is not started yet err = pc.requestSeekWithoutClear(msgID.messageID) if err != nil { - pc.nackTracker.Close() + pc.Close() return nil, err } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9f89d7b7..1a40a191 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1405,7 +1405,7 @@ func (p *partitionProducer) setProducerState(state producerState) { p.state.Swap(int32(state)) } -// set a new consumerState and return the last state +// set a new producerState and return the last state // returns bool if the new state has been set or not func (p *partitionProducer) casProducerState(oldState, newState producerState) bool { return p.state.CAS(int32(oldState), int32(newState))
