This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c75aa62  fix producer block (#326)
c75aa62 is described below

commit c75aa626183fb4086a9438215e9c55fed16565d9
Author: faker <[email protected]>
AuthorDate: Fri Jul 24 10:14:10 2020 +0800

    fix producer block (#326)
    
    Co-authored-by: 灰柯 <[email protected]>
---
 pulsar/consumer_partition.go  | 4 +++-
 pulsar/internal/connection.go | 2 +-
 pulsar/producer_partition.go  | 3 ++-
 3 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 0c723c8..b83cb88 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -180,7 +180,9 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                dlq:                  dlq,
                log:                  log.WithField("topic", options.topic),
        }
-       pc.log = pc.log.WithField("name", pc.name).WithField("subscription", 
options.subscription)
+       pc.log = pc.log.WithField("name", pc.name).
+               WithField("subscription", options.subscription).
+               WithField("consumerID", pc.consumerID)
        pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
 
        err := pc.grabConn()
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 703c773..4be9ba2 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -636,7 +636,7 @@ func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer)
 
        if consumer, ok := c.consumerHandler(consumerID); ok {
                consumer.ConnectionClosed()
-               delete(c.listeners, consumerID)
+               c.DeleteConsumeHandler(consumerID)
        } else {
                c.log.WithField("consumerID", consumerID).Warnf("Consumer with 
ID not found while closing consumer")
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3832d69..18f22f6 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -150,7 +150,8 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                return nil, err
        }
 
-       p.log = p.log.WithField("producer_name", p.producerName)
+       p.log = p.log.WithField("producer_name", p.producerName).
+               WithField("producerID", p.producerID)
        p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
        atomic.StoreInt32(&p.state, producerReady)
 

Reply via email to