wolfstudy commented on a change in pull request #701:
URL: https://github.com/apache/pulsar-client-go/pull/701#discussion_r782684668
##########
File path: pulsar/producer_partition.go
##########
@@ -800,7 +801,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
// the state discrepancy.
p.log.Warnf("Received ack for %v on sequenceId %v - expected:
%v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
- p.cnx.Close()
+ p._getConn().Close()
Review comment:
@freeznet Looking at the implementation of the current Close method,
when triggering the logic of Close, the state of the current connection will
first be set to connectionClosed. When GetConnection() is executed, if the
state of the connection is detected to be closed, the connection will be
removed from the cache.
```
func (c *connection) Close() {
c.closeOnce.Do(func() {
c.Lock()
cnx := c.cnx
c.Unlock()
c.changeState(connectionClosed)
```
```
if conn.closed() {
delete(p.connections, key)
p.log.Infof("Removed connection from pool key=%s
logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
conn = nil // set to nil so we create a new one
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]