This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e2422b  Fix using closed connection in consumer (#785)
1e2422b is described below

commit 1e2422bc5cfec92e7fec5df5e48b8e766a2d6288
Author: hrsakai <[email protected]>
AuthorDate: Fri Jun 24 04:43:52 2022 +0900

    Fix using closed connection in consumer (#785)
---
 pulsar/consumer_partition.go  |  6 +++++-
 pulsar/internal/connection.go | 12 +++++++-----
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 1ddcc39..4cc1645 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1187,7 +1187,11 @@ func (pc *partitionConsumer) grabConn() error {
 
        pc._setConn(res.Cnx)
        pc.log.Info("Connected consumer")
-       pc._getConn().AddConsumeHandler(pc.consumerID, pc)
+       err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to add consumer handler")
+               return err
+       }
 
        msgType := res.Response.GetType()
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index fa8d055..0249020 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,8 +53,9 @@ type TLSOptions struct {
 }
 
 var (
-       errConnectionClosed       = errors.New("connection closed")
-       errUnableRegisterListener = errors.New("unable register listener when 
con closed")
+       errConnectionClosed        = errors.New("connection closed")
+       errUnableRegisterListener  = errors.New("unable register listener when 
con closed")
+       errUnableAddConsumeHandler = errors.New("unable add consumer handler 
when con closed")
 )
 
 // ConnectionListener is a user of a connection (eg. a producer or
@@ -75,7 +76,7 @@ type Connection interface {
        WriteData(data Buffer)
        RegisterListener(id uint64, listener ConnectionListener) error
        UnregisterListener(id uint64)
-       AddConsumeHandler(id uint64, handler ConsumerHandler)
+       AddConsumeHandler(id uint64, handler ConsumerHandler) error
        DeleteConsumeHandler(id uint64)
        ID() string
        GetMaxMessageSize() int32
@@ -994,16 +995,17 @@ func (c *connection) getTLSConfig() (*tls.Config, error) {
        return tlsConfig, nil
 }
 
-func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
+func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) 
error {
        // do not add if connection is closed
        if c.closed() {
                c.log.Warnf("Closed connection unable add consumer with 
id=%+v", id)
-               return
+               return errUnableAddConsumeHandler
        }
 
        c.consumerHandlersLock.Lock()
        defer c.consumerHandlersLock.Unlock()
        c.consumerHandlers[id] = handler
+       return nil
 }
 
 func (c *connection) DeleteConsumeHandler(id uint64) {

Reply via email to