This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 67e2075 Revert "Fix stuck when reconnect broker (#703)" (#767)
67e2075 is described below
commit 67e2075c618ac967008fa713ef3e172d64e068e5
Author: Lari Hotari <[email protected]>
AuthorDate: Mon May 23 06:41:42 2022 +0300
Revert "Fix stuck when reconnect broker (#703)" (#767)
This reverts commit 1a8432cfd3aa231f8eb3c97171a47eab98a8f20a.
---
pulsar/internal/connection.go | 4 ----
pulsar/internal/connection_pool.go | 2 +-
2 files changed, 1 insertion(+), 5 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 11c9d49..da9a901 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -823,8 +823,6 @@ func (c *connection) handleCloseConsumer(closeConsumer
*pb.CommandCloseConsumer)
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
- c.changeState(connectionClosed)
-
if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
c.DeleteConsumeHandler(consumerID)
@@ -837,8 +835,6 @@ func (c *connection) handleCloseProducer(closeProducer
*pb.CommandCloseProducer)
c.log.Infof("Broker notification of Closed producer: %d",
closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()
- c.changeState(connectionClosed)
-
producer, ok := c.deletePendingProducers(producerID)
// did we find a producer?
if ok {
diff --git a/pulsar/internal/connection_pool.go
b/pulsar/internal/connection_pool.go
index 6491abd..5ec457e 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -82,7 +82,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL,
physicalAddr *url.U
// current connection is closed, we need to remove the
connection object from the current
// connection pool and create a new connection.
if conn.closed() {
- p.log.Infof("Removed connection from pool key=%s
logical_addr=%+v physical_addr=%+v",
+ p.log.Debugf("Removed connection from pool key=%s
logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
delete(p.connections, key)
conn.Close()